Pandas UDFs#
When coding with Spark, you will generally want to try and use native Spark functions wherever possible (i.e. functions in pyspark.sql.functions
for PySpark). However, there are instances where there may not be a Spark function available to do what you need. In cases such as this, the second best option is to use a Pandas UDF.
Why are Pandas UDFs better than Python UDFs?#
Ordinary Python UDFs are not particularly efficient in Spark as they do not make full use of the Spark cluster and can only be processed in the Python runtime. As the code for Python UDFs cannot be executed in the Java Virtual Machine (JVM), the platform which runs Spark, each row of the dataframe is serialised and deserialised between the Python runtime and the JVM. As can be imagined, this causes massive (de)serialisation overheads, high data copy in memory and is very slow!
Pandas UDFs (also known as vectorised UDFs) can work on the Spark executors to process data in a distributed manner and allow for vectorised operations. This means that Pandas UDFs can work on the whole data partition at once instead of just one row at a time like Python UDFs. This vectorisation is achieved by using Apache Arrow to transfer data across the cluster between the JVM and the Python executors with very low data copy in memory and (de)serialisation overheads. Essentially, Apache Arrow acts as a middleman to store a single copy of the data which can be accessed by both Python and Java processes.
Types of Pandas UDFs#
Spark 2.4 is compatible with 3 types of Pandas UDFs:
Scalar
Grouped Map
Grouped Aggregate
Spark 3.x is compatible with the additional types:
Scalar Iterator
Map
Cogroup Mapped
This demo will cover the three types supported in Spark 2.4. Additional information on the other types for Spark 3.x can be found here.
Declaring Pandas UDFs in Spark#
You can create a Pandas UDF in a number of ways.
Using the
@pandas_udf(<type>, F.PandasUDFType.<type>)
decorator.Assigning the UDF using
<udf_name>_udf = F.pandas_udf(<udf_name>, returnType = <type>)
Using Python Type hints. Python hints can be used to make Pandas UDFs more descriptive. For more info see here.
This demo will use the pandas_udf
decorator and Python hints to annotate the functions with the input and output data types.
Spark Setup for Pandas UDFs#
For Spark 3, to use Pandas UDFs you will have to install Pandas and PyArrow using:
pip install pandas
pip install pyarrow
For Spark 2.4 you will need to install the older versions of Pandas and PyArrow using. We found the following versions to be the highest which will work with Pandas UDFs, although expect a deprecation warning:
pip install pandas==1.1.5
pip install pyarrow==0.14.0
The offical PySpark Usage Guide for Pandas with Apache Arrow advises that compatibility issues may occur with Pandas >0.19.2 and PyArrow >0.8.0, so if you are finding issues, it may be best to switch to these lower versions.
You should also add configs to enable PyArrow optimisation and fallback if it is not installed. Even if you are not using Pandas UDFs the below configs can make the conversion between Pandas and Spark more efficient.
.config('spark.sql.execution.arrow.enabled', 'true')
.config('spark.sql.execution.arrow.fallback.enabled', 'true')
For Spark 2.4 you will need to add the below additional configs to your Spark session for compatibility:
.config('spark.excutorEnv.ARROW_PRE_0_15_IPC_FORMAT', 1)
.config('spark.workerEnv.ARROW_PRE_0_15_IPC_FORMAT', 1)
In our demo we are using a local Spark session and will be working with the animal rescue dataset used throughout in this book. We have used examples of Pandas UDFs applied to the rescue dataset only for the purpose of demonstration as there are Spark functions which in a real-life scenario would be used instead. For other simple examples of Pandas UDFs please see Databricks Introduction to Pandas UDFs.
Firstly, we need to import packages and set up a Spark session for PyArrow optimisation and compatibility with Spark 2.4. Then read the data, rename and add columns and if necessary remove nulls:
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
import pyspark.sql.types as T
from pyspark.sql.functions import lit
spark = (SparkSession.builder.master("local[2]")
.appName("pandas_udfs")
.config('spark.sql.execution.arrow.enabled', 'true')
.config('spark.sql.execution.arrow.fallback.enabled', 'true')
.config('spark.excutorEnv.ARROW_PRE_0_15_IPC_FORMAT', 1)
.config('spark.workerEnv.ARROW_PRE_0_15_IPC_FORMAT', 1)
.getOrCreate())
import yaml
with open("../../config.yaml") as f:
config = yaml.safe_load(f)
rescue_path_csv = config["rescue_path_csv"]
rescue = spark.read.csv(rescue_path_csv, header=True, inferSchema=True)
rescue = rescue.select(F.col('IncidentNumber').alias('incident_number'),
F.col('AnimalGroupParent').alias('animal_group'),
F.col('Easting_rounded').alias('easting_rounded'),
F.col('Northing_rounded').alias('northing_rounded'))
rescue= rescue.withColumn('lfb_easting', lit(532066)).withColumn('lfb_northing', lit(180010))
rescue.limit(10).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
incident_number | animal_group | easting_rounded | northing_rounded | lfb_easting | lfb_northing | |
---|---|---|---|---|---|---|
0 | 139091 | Dog | 532350 | 170050 | 532066 | 180010 |
1 | 275091 | Fox | 534750 | 167550 | 532066 | 180010 |
2 | 2075091 | Dog | 528050 | 164950 | 532066 | 180010 |
3 | 2872091 | Horse | 504650 | 190650 | 532066 | 180010 |
4 | 3553091 | Rabbit | 554650 | 192350 | 532066 | 180010 |
5 | 3742091 | Unknown - Heavy Livestock Animal | 549350 | 184950 | 532066 | 180010 |
6 | 4011091 | Dog | 539050 | 186150 | 532066 | 180010 |
7 | 4211091 | Dog | 541350 | 186650 | 532066 | 180010 |
8 | 4306091 | Squirrel | 538750 | 163350 | 532066 | 180010 |
9 | 4715091 | Dog | 535450 | 186750 | 532066 | 180010 |
Scalar Pandas UDFs#
Scalar Pandas UDFs take in a pandas.Series
and output a pandas.Series
.
Spark executes scalar Pandas UDFs by serialising each partition column into a pandas.Series
object (basically splitting the data into batches). The UDF is then called on each of of these Series objects as a subset of the data and then results are concactenated together and returned as a pandas.Series
.
Be aware that scalar UDFs may cause incorrect results (if your dataframe is over multiple partitions) when calculating means and standard deviations. For more info on this and how to overcome this issue see this towards data science post.
In the decorator of a scalar Pandas UDF the first argument is the data type of the output dataframe and the second argument is the UDF type. Here is a simple Pandas scalar UDF to find out the distance
between the London Fire Brigade (lfb) and the location of an animal-related incident. The inputs to the function are Eastings and Northings which are types of coordinates that tell you how far East or North a point is; therefore, there are two points inputted for the lfb location and two points inputted for the animal-related incident. The function works to find the distance between these two points and converts the answer into ‘km’.
Please note that the following example has been used here to illustrate how to use a Pandas UDF, this is not necessarily the most efficient way to write this function as it can also be written in PySpark.
@pandas_udf(T.DoubleType(), F.PandasUDFType.SCALAR)
def distance_to_incident(easting_rounded:pd.Series, lfb_easting:pd.Series, northing_rounded:pd.Series, lfb_northing:pd.Series)->pd.Series:
distance = ((easting_rounded - lfb_easting)**2 + (northing_rounded - lfb_northing)**2)**0.5
distance_km = distance/1000
return distance_km
lfb_distance = rescue.withColumn("distance", distance_to_incident('easting_rounded', 'lfb_easting', 'northing_rounded', 'lfb_northing'))
lfb_distance.limit(5).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
incident_number | animal_group | easting_rounded | northing_rounded | lfb_easting | lfb_northing | distance | |
---|---|---|---|---|---|---|---|
0 | 139091 | Dog | 532350 | 170050 | 532066 | 180010 | 9.964048 |
1 | 275091 | Fox | 534750 | 167550 | 532066 | 180010 | 12.745802 |
2 | 2075091 | Dog | 528050 | 164950 | 532066 | 180010 | 15.586271 |
3 | 2872091 | Horse | 504650 | 190650 | 532066 | 180010 | 29.408275 |
4 | 3553091 | Rabbit | 554650 | 192350 | 532066 | 180010 | 25.735436 |
Grouped Map Pandas UDFs#
Grouped map UDFs take in a pandas.DataFrame and output a pandas.DataFrame.
Grouped Map UDFs use the split-apply-combine format. This is where data is first split into groups based on the .groupby()
function. The UDF is then mapped over each group to return multiple Pandas dataframes. The results of the Pandas dataframes are then combined and a new Spark dataframe is returned.
In the decorator of a grouped map Pandas UDF the first argument is the schema of the output dataframe and the second argument is the UDF type. Here is a grouped map UDF to find the distance
travelled by the London Fire Brigade compared to the overall mean distance
they travel to an incident, grouped by animal_group
. This example uses the outputted dataframe: lfb_distance
from the scalar UDF example above. The output is in ‘km’.
@pandas_udf(lfb_distance.schema, F.PandasUDFType.GROUPED_MAP)
def dist_animal(pdf:pd.DataFrame) -> pd.DataFrame:
distance = pdf.distance
return pdf.assign(distance=((distance - distance.mean()))/ distance.mean() * 100)
dist_by_animal = lfb_distance.groupBy('animal_group').apply(dist_animal)
dist_by_animal.limit(10).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
incident_number | animal_group | easting_rounded | northing_rounded | lfb_easting | lfb_northing | distance | |
---|---|---|---|---|---|---|---|
0 | 59442121 | Unknown - Animal rescue from water - Farm animal | 558450 | 189150 | 532066 | 180010 | -5.407423 |
1 | 4149 | Unknown - Animal rescue from water - Farm animal | 552850 | 175450 | 532066 | 180010 | -27.915184 |
2 | 012249-30012019 | Unknown - Animal rescue from water - Farm animal | 571350 | 177650 | 532066 | 180010 | 33.322606 |
3 | 188198101 | Cow | 528350 | 185350 | 532066 | 180010 | -66.012881 |
4 | 207392101 | Cow | 559650 | 187050 | 532066 | 180010 | 48.723553 |
5 | 100125121 | Cow | 504050 | 191350 | 532066 | 180010 | 57.896324 |
6 | 113544121 | Cow | 551350 | 180850 | 532066 | 180010 | 0.838977 |
7 | 122937121 | Cow | 536350 | 195950 | 532066 | 180010 | -13.771242 |
8 | 112376141 | Cow | 535950 | 196450 | 532066 | 180010 | -11.749838 |
9 | 120222-06092016 | Cow | 536250 | 195550 | 532066 | 180010 | -15.924893 |
Grouped Aggregate Pandas UDFs#
Grouped aggregate Pandas UDFs take in one or more pandas.Series and output a scalar.
Grouped aggregate Pandas UDFs are used alongisde the .groupby()
and .agg()
functions and are similiar to the Spark .agg()
function.
In the decorator of a grouped aggregate Pandas UDF the first argument is the data type of the output dataframe and the second argument is the UDF type. Below is an example of a grouped aggregate Pandas UDF to find the mean distance travelled by the London Fire Brigade based on the animal_group
.
@pandas_udf(T.DoubleType(), F.PandasUDFType.GROUPED_AGG)
def mean_distance(distance: pd.Series) -> T.DoubleType():
return distance.mean()
mean_distance_travelled = (lfb_distance.groupBy('animal_group')
.agg(mean_distance(lfb_distance['distance']).alias('mean_distance'))
.orderBy('mean_distance', ascending = False))
mean_distance_travelled.limit(10).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
animal_group | mean_distance | |
---|---|---|
0 | Unknown - Animal rescue from water - Farm animal | 29.518494 |
1 | Hedgehog | 21.457480 |
2 | Bull | 21.059503 |
3 | Deer | 19.603281 |
4 | Cow | 19.141692 |
5 | Horse | 19.112879 |
6 | Budgie | 18.338150 |
7 | Fish | 17.896080 |
8 | Lizard | 17.358752 |
9 | Sheep | 16.884412 |
The grouped aggregate Pandas UDF can also be used with the PySpark window functions. Each pandas.Series
in the input represents a group or window. In Spark 2.4, the grouped aggregate UDF does not support partial aggregations with only an unbounded window supported. Here is an example:
from pyspark.sql.window import Window
w = (Window.partitionBy('animal_group')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
rescue = (lfb_distance.withColumn('mean_distance', mean_distance(lfb_distance['distance']).over(w))
.orderBy('mean_distance', ascending=False))
rescue.limit(10).toPandas()
/home/cdsw/.local/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
incident_number | animal_group | easting_rounded | northing_rounded | lfb_easting | lfb_northing | distance | mean_distance | |
---|---|---|---|---|---|---|---|---|
0 | 59442121 | Unknown - Animal rescue from water - Farm animal | 558450 | 189150 | 532066 | 180010 | 27.922304 | 29.518494 |
1 | 4149 | Unknown - Animal rescue from water - Farm animal | 552850 | 175450 | 532066 | 180010 | 21.278352 | 29.518494 |
2 | 012249-30012019 | Unknown - Animal rescue from water - Farm animal | 571350 | 177650 | 532066 | 180010 | 39.354825 | 29.518494 |
3 | 144140091 | Hedgehog | 510750 | 177550 | 532066 | 180010 | 21.457480 | 21.457480 |
4 | 165500101 | Bull | 514650 | 191850 | 532066 | 180010 | 21.059503 | 21.059503 |
5 | 151097091 | Deer | 529850 | 192450 | 532066 | 180010 | 12.635832 | 19.603281 |
6 | 181559091 | Deer | 555150 | 192650 | 532066 | 180010 | 26.318067 | 19.603281 |
7 | 153698091 | Deer | 535150 | 198750 | 532066 | 180010 | 18.992068 | 19.603281 |
8 | 206935091 | Deer | 536150 | 162250 | 532066 | 180010 | 18.223519 | 19.603281 |
9 | 167858091 | Deer | 522650 | 192450 | 532066 | 180010 | 15.601752 | 19.603281 |
Additional Resources#
For a more in-depth explaination of what happens under the hood of different types of Pandas UDFs see: