Pandas UDFs#

When coding with Spark, you will generally want to try and use native Spark functions wherever possible (i.e. functions in pyspark.sql.functions for PySpark). However, there are instances where there may not be a Spark function available to do what you need. In cases such as this, the second best option is to use a Pandas UDF.

Why are Pandas UDFs better than Python UDFs?#

Ordinary Python UDFs are not particularly efficient in Spark as they do not make full use of the Spark cluster and can only be processed in the Python runtime. As the code for Python UDFs cannot be executed in the Java Virtual Machine (JVM), the platform which runs Spark, each row of the dataframe is serialised and deserialised between the Python runtime and the JVM. As can be imagined, this causes massive (de)serialisation overheads, high data copy in memory and is very slow!

Pandas UDFs (also known as vectorised UDFs) can work on the Spark executors to process data in a distributed manner and allow for vectorised operations. This means that Pandas UDFs can work on the whole data partition at once instead of just one row at a time like Python UDFs. This vectorisation is achieved by using Apache Arrow to transfer data across the cluster between the JVM and the Python executors with very low data copy in memory and (de)serialisation overheads. Essentially, Apache Arrow acts as a middleman to store a single copy of the data which can be accessed by both Python and Java processes.

Types of Pandas UDFs#

Spark 2.4 is compatible with 3 types of Pandas UDFs:

  • Scalar

  • Grouped Map

  • Grouped Aggregate

Spark 3.x is compatible with the additional types:

  • Scalar Iterator

  • Map

  • Cogroup Mapped

This demo will cover the three types supported in Spark 2.4. Additional information on the other types for Spark 3.x can be found here.

Declaring Pandas UDFs in Spark#

You can create a Pandas UDF in a number of ways.

  • Using the @pandas_udf(<type>, F.PandasUDFType.<type>) decorator.

  • Assigning the UDF using <udf_name>_udf = F.pandas_udf(<udf_name>, returnType = <type>)

  • Using Python Type hints. Python hints can be used to make Pandas UDFs more descriptive. For more info see here.

This demo will use the pandas_udf decorator and Python hints to annotate the functions with the input and output data types.

Spark Setup for Pandas UDFs#

For Spark 3, to use Pandas UDFs you will have to install Pandas and PyArrow using:

pip install pandas
pip install pyarrow

For Spark 2.4 you will need to install the older versions of Pandas and PyArrow using. We found the following versions to be the highest which will work with Pandas UDFs, although expect a deprecation warning:

pip install pandas==1.1.5
pip install pyarrow==0.14.0

The offical PySpark Usage Guide for Pandas with Apache Arrow advises that compatibility issues may occur with Pandas >0.19.2 and PyArrow >0.8.0, so if you are finding issues, it may be best to switch to these lower versions.

You should also add configs to enable PyArrow optimisation and fallback if it is not installed. Even if you are not using Pandas UDFs the below configs can make the conversion between Pandas and Spark more efficient.

.config('spark.sql.execution.arrow.enabled', 'true')
.config('spark.sql.execution.arrow.fallback.enabled', 'true')

For Spark 2.4 you will need to add the below additional configs to your Spark session for compatibility:

.config('spark.excutorEnv.ARROW_PRE_0_15_IPC_FORMAT', 1)
.config('spark.workerEnv.ARROW_PRE_0_15_IPC_FORMAT', 1)

In our demo we are using a local Spark session and will be working with the animal rescue dataset used throughout in this book. We have used examples of Pandas UDFs applied to the rescue dataset only for the purpose of demonstration as there are Spark functions which in a real-life scenario would be used instead. For other simple examples of Pandas UDFs please see Databricks Introduction to Pandas UDFs.

Firstly, we need to import packages and set up a Spark session for PyArrow optimisation and compatibility with Spark 2.4. Then read the data, rename and add columns and if necessary remove nulls:

import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
import pyspark.sql.types as T
from pyspark.sql.functions import lit

spark = (SparkSession.builder.master("local[2]")
         .appName("pandas_udfs")
         .config('spark.sql.execution.arrow.enabled', 'true')
         .config('spark.sql.execution.arrow.fallback.enabled', 'true')
         .config('spark.excutorEnv.ARROW_PRE_0_15_IPC_FORMAT', 1)
         .config('spark.workerEnv.ARROW_PRE_0_15_IPC_FORMAT', 1)
         .getOrCreate())

import yaml
with open("../../config.yaml") as f:
     config = yaml.safe_load(f)

rescue_path_csv = config["rescue_path_csv"]
rescue = spark.read.csv(rescue_path_csv, header=True, inferSchema=True)

rescue = rescue.select(F.col('IncidentNumber').alias('incident_number'),
                       F.col('AnimalGroupParent').alias('animal_group'), 
                       F.col('Easting_rounded').alias('easting_rounded'), 
                       F.col('Northing_rounded').alias('northing_rounded'))

rescue= rescue.withColumn('lfb_easting', lit(532066)).withColumn('lfb_northing', lit(180010))


rescue.limit(10).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
incident_number animal_group easting_rounded northing_rounded lfb_easting lfb_northing
0 139091 Dog 532350 170050 532066 180010
1 275091 Fox 534750 167550 532066 180010
2 2075091 Dog 528050 164950 532066 180010
3 2872091 Horse 504650 190650 532066 180010
4 3553091 Rabbit 554650 192350 532066 180010
5 3742091 Unknown - Heavy Livestock Animal 549350 184950 532066 180010
6 4011091 Dog 539050 186150 532066 180010
7 4211091 Dog 541350 186650 532066 180010
8 4306091 Squirrel 538750 163350 532066 180010
9 4715091 Dog 535450 186750 532066 180010

Scalar Pandas UDFs#

Scalar Pandas UDFs take in a pandas.Series and output a pandas.Series.

Spark executes scalar Pandas UDFs by serialising each partition column into a pandas.Series object (basically splitting the data into batches). The UDF is then called on each of of these Series objects as a subset of the data and then results are concactenated together and returned as a pandas.Series.

Be aware that scalar UDFs may cause incorrect results (if your dataframe is over multiple partitions) when calculating means and standard deviations. For more info on this and how to overcome this issue see this towards data science post.

In the decorator of a scalar Pandas UDF the first argument is the data type of the output dataframe and the second argument is the UDF type. Here is a simple Pandas scalar UDF to find out the distance between the London Fire Brigade (lfb) and the location of an animal-related incident. The inputs to the function are Eastings and Northings which are types of coordinates that tell you how far East or North a point is; therefore, there are two points inputted for the lfb location and two points inputted for the animal-related incident. The function works to find the distance between these two points and converts the answer into ‘km’.

Please note that the following example has been used here to illustrate how to use a Pandas UDF, this is not necessarily the most efficient way to write this function as it can also be written in PySpark.

@pandas_udf(T.DoubleType(), F.PandasUDFType.SCALAR)
def distance_to_incident(easting_rounded:pd.Series, lfb_easting:pd.Series, northing_rounded:pd.Series, lfb_northing:pd.Series)->pd.Series:
    distance = ((easting_rounded - lfb_easting)**2 + (northing_rounded - lfb_northing)**2)**0.5
    distance_km = distance/1000
    return distance_km

lfb_distance = rescue.withColumn("distance", distance_to_incident('easting_rounded', 'lfb_easting', 'northing_rounded', 'lfb_northing'))
lfb_distance.limit(5).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
incident_number animal_group easting_rounded northing_rounded lfb_easting lfb_northing distance
0 139091 Dog 532350 170050 532066 180010 9.964048
1 275091 Fox 534750 167550 532066 180010 12.745802
2 2075091 Dog 528050 164950 532066 180010 15.586271
3 2872091 Horse 504650 190650 532066 180010 29.408275
4 3553091 Rabbit 554650 192350 532066 180010 25.735436

Grouped Map Pandas UDFs#

Grouped map UDFs take in a pandas.DataFrame and output a pandas.DataFrame.

Grouped Map UDFs use the split-apply-combine format. This is where data is first split into groups based on the .groupby() function. The UDF is then mapped over each group to return multiple Pandas dataframes. The results of the Pandas dataframes are then combined and a new Spark dataframe is returned.

In the decorator of a grouped map Pandas UDF the first argument is the schema of the output dataframe and the second argument is the UDF type. Here is a grouped map UDF to find the distance travelled by the London Fire Brigade compared to the overall mean distance they travel to an incident, grouped by animal_group. This example uses the outputted dataframe: lfb_distance from the scalar UDF example above. The output is in ‘km’.

@pandas_udf(lfb_distance.schema, F.PandasUDFType.GROUPED_MAP)
def dist_animal(pdf:pd.DataFrame) -> pd.DataFrame:
    distance = pdf.distance
    return pdf.assign(distance=((distance - distance.mean()))/ distance.mean() * 100)

dist_by_animal = lfb_distance.groupBy('animal_group').apply(dist_animal)
dist_by_animal.limit(10).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
incident_number animal_group easting_rounded northing_rounded lfb_easting lfb_northing distance
0 59442121 Unknown - Animal rescue from water - Farm animal 558450 189150 532066 180010 -5.407423
1 4149 Unknown - Animal rescue from water - Farm animal 552850 175450 532066 180010 -27.915184
2 012249-30012019 Unknown - Animal rescue from water - Farm animal 571350 177650 532066 180010 33.322606
3 188198101 Cow 528350 185350 532066 180010 -66.012881
4 207392101 Cow 559650 187050 532066 180010 48.723553
5 100125121 Cow 504050 191350 532066 180010 57.896324
6 113544121 Cow 551350 180850 532066 180010 0.838977
7 122937121 Cow 536350 195950 532066 180010 -13.771242
8 112376141 Cow 535950 196450 532066 180010 -11.749838
9 120222-06092016 Cow 536250 195550 532066 180010 -15.924893

Grouped Aggregate Pandas UDFs#

Grouped aggregate Pandas UDFs take in one or more pandas.Series and output a scalar.

Grouped aggregate Pandas UDFs are used alongisde the .groupby() and .agg() functions and are similiar to the Spark .agg() function.

In the decorator of a grouped aggregate Pandas UDF the first argument is the data type of the output dataframe and the second argument is the UDF type. Below is an example of a grouped aggregate Pandas UDF to find the mean distance travelled by the London Fire Brigade based on the animal_group.

@pandas_udf(T.DoubleType(), F.PandasUDFType.GROUPED_AGG)
def mean_distance(distance: pd.Series) -> T.DoubleType():
    return distance.mean()

mean_distance_travelled = (lfb_distance.groupBy('animal_group')
      .agg(mean_distance(lfb_distance['distance']).alias('mean_distance'))
      .orderBy('mean_distance', ascending = False))

mean_distance_travelled.limit(10).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
animal_group mean_distance
0 Unknown - Animal rescue from water - Farm animal 29.518494
1 Hedgehog 21.457480
2 Bull 21.059503
3 Deer 19.603281
4 Cow 19.141692
5 Horse 19.112879
6 Budgie 18.338150
7 Fish 17.896080
8 Lizard 17.358752
9 Sheep 16.884412

The grouped aggregate Pandas UDF can also be used with the PySpark window functions. Each pandas.Series in the input represents a group or window. In Spark 2.4, the grouped aggregate UDF does not support partial aggregations with only an unbounded window supported. Here is an example:

from pyspark.sql.window import Window

w = (Window.partitionBy('animal_group')
           .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

rescue = (lfb_distance.withColumn('mean_distance', mean_distance(lfb_distance['distance']).over(w))
                .orderBy('mean_distance', ascending=False))


rescue.limit(10).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
incident_number animal_group easting_rounded northing_rounded lfb_easting lfb_northing distance mean_distance
0 59442121 Unknown - Animal rescue from water - Farm animal 558450 189150 532066 180010 27.922304 29.518494
1 4149 Unknown - Animal rescue from water - Farm animal 552850 175450 532066 180010 21.278352 29.518494
2 012249-30012019 Unknown - Animal rescue from water - Farm animal 571350 177650 532066 180010 39.354825 29.518494
3 144140091 Hedgehog 510750 177550 532066 180010 21.457480 21.457480
4 165500101 Bull 514650 191850 532066 180010 21.059503 21.059503
5 151097091 Deer 529850 192450 532066 180010 12.635832 19.603281
6 181559091 Deer 555150 192650 532066 180010 26.318067 19.603281
7 153698091 Deer 535150 198750 532066 180010 18.992068 19.603281
8 206935091 Deer 536150 162250 532066 180010 18.223519 19.603281
9 167858091 Deer 522650 192450 532066 180010 15.601752 19.603281

Additional Resources#

For a more in-depth explaination of what happens under the hood of different types of Pandas UDFs see: