Introduction to PySpark#

This article aims to give hands on experience in working with the DataFrame API in PySpark. You can download this article as a notebook and run the code yourself by clicking on the download button above and selecting .ipynb.

We will not aim to cover all the PySpark DataFrame functionality or go into detail of how Spark works, but instead focus on practicality by performing some common operations on an example dataset. There are a handful of exercises that you can complete while reading this article.

Prerequisites for this article are some basic knowledge of Python and pandas. If you are completely new to Python then it is recommended to complete an introductory course first; your organisation may have specific Python training. Other resources include the Python Tutorial and 10 Minutes to pandas. If you are an R user, the Introduction to sparklyr article follows similar format.

PySpark: a quick introduction#

Although this article focusses on practical usage to enable you to quickly use PySpark, you do need to understand some basic theory of Spark and distributed computing.

Spark is a powerful tool used to process huge data in an efficient way. We can access Spark in Python with the PySpark package. Spark has DataFrames, consisting of rows and columns, similar to pandas. Many of the operations are also similar, if not identically named: e.g. you can select and add columns, filter rows, group and aggregate.

The key difference between PySpark and pandas is where the DataFrame is processed:

  • pandas DataFrames are processed on the driver; this could be on a local machine using a desktop IDE such as Spyder or PyCharm, or on a server, e.g. in a dedicated Docker container (such as a CDSW session). The amount of data you can process is limited to the driver memory, so pandas is suitable for smaller data.

  • PySpark DataFrames are processed on the Spark cluster. This is a big pool of linked machines, called nodes. PySpark DataFrames are distributed into partitions, and are processed in parallel on the nodes in the Spark cluster. You can have much greater memory capacity with Spark and so is suitable for big data.

The DataFrame is also processed differently:

  • In pandas, the DataFrame changes in memory at each point, e.g. you could create a DataFrame by reading from a CSV file, select some columns, filter the rows, add a column and then write the data out. With each operation, the DataFrame is physically changing in memory. This can be useful for debugging as it is easy to see intermediate outputs.

  • In PySpark, DataFrames are lazily evaluated. We give Spark a set of instructions, called transformations, which are only evaluated when necessary, for instance to get a row count or write out data to a file, referred to as an action. In the example above, the plan is triggered once the data are set to write out to a file.

For more detail on how Spark works, you can refer to the articles in the Understanding and Optimising Spark chapter of this book. Databricks: Learning Spark is another useful resource.

The pyspark Package#

As with all coding scripts or notebooks the first thing we do is to import the relevant packages. When coding in PySpark there are two particular imports we need.

Firstly we will import the SparkSession class, which we will use to create a SparkSession object for processing data using Spark.

The second is the pyspark.sql.functions module, which contain functions that can be applied to Spark DataFrames, or columns within the DataFrames. The standard method is to import the functions module with the alias F, which means whenever we want to call a function from this module we write F.function_name().

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

We will also want to import the pandas module as pd as it makes viewing data much easier and neater, and yaml for reading the config file, which contains the file path of the source data.

import pandas as pd
import yaml

with open("../../config.yaml") as f:
    config = yaml.safe_load(f)

As pyspark.sql.functions is just a Python package, dir() to list the functions and help() both work as normal, although the easiest way is to look at the documentation. There are a lot of functions in this module and you will be very unlikely to use them all.

Create a Spark session: SparkSession.builder#

With our SparkSession class imported we now want to create a connection to the Spark cluster. We use SparkSession.builder and assign this to spark. SparkSession.builder has many options; see the Guidance on Spark Sessions and also Example Spark Sessions to get an idea of what sized session to use.

For this article, we are using a tiny dataset by Spark standards, and so are using a local session. This also means that you can run this code without having access to a Spark cluster.

Note that only one Spark session can be running at once. If a session already exists then a new one will not created, instead the connection to the existing session will be used, hence the .getOrCreate() method.

spark = (SparkSession.builder.master("local[2]")
         .appName("pyspark-intro")
         .getOrCreate())

Reading data: spark.read.csv()#

For this article we will look at some open data on animal rescue incidents from the London Fire Brigade. The data are stored as a CSV, although the parquet file format is the most common when using Spark. The reason for using CSV in this article is because it is a familiar file format and allows you to adapt this code easily for your own sample data. See the article on Reading Data in PySpark for more information.

Often your data will be large and stored using Hadoop, on the Hadoop Distributed File System (HDFS). This example uses a local file, enabling us to get started quickly; see the article on Data Storage for more information.

To read in from a CSV file, use spark.read.csv(). The file path is stored in the config file as rescue_path_csv. Using header=True means that the DataFrame will use the column headers from the CSV as the column names. CSV files do not contain information about the data types, so use inferSchema=True which makes Spark scan the file to infer the data types.

rescue_path_csv = config["rescue_path_csv"]
rescue = spark.read.csv(rescue_path_csv, header=True, inferSchema=True)

Preview data: .printSchema()#

To view the column names and data types use .printSchema(). This will not return any actual data.

rescue.printSchema()
root
 |-- IncidentNumber: string (nullable = true)
 |-- DateTimeOfCall: string (nullable = true)
 |-- CalYear: integer (nullable = true)
 |-- FinYear: string (nullable = true)
 |-- TypeOfIncident: string (nullable = true)
 |-- PumpCount: double (nullable = true)
 |-- PumpHoursTotal: double (nullable = true)
 |-- HourlyNotionalCost(£): integer (nullable = true)
 |-- IncidentNotionalCost(£): double (nullable = true)
 |-- FinalDescription: string (nullable = true)
 |-- AnimalGroupParent: string (nullable = true)
 |-- OriginofCall: string (nullable = true)
 |-- PropertyType: string (nullable = true)
 |-- PropertyCategory: string (nullable = true)
 |-- SpecialServiceTypeCategory: string (nullable = true)
 |-- SpecialServiceType: string (nullable = true)
 |-- WardCode: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- BoroughCode: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- StnGroundName: string (nullable = true)
 |-- PostcodeDistrict: string (nullable = true)
 |-- Easting_m: double (nullable = true)
 |-- Northing_m: double (nullable = true)
 |-- Easting_rounded: integer (nullable = true)
 |-- Northing_rounded: integer (nullable = true)

Show data: .show()#

The .show() function is an action that previews a DataFrame. .show() is an action, meaning that all previous transformations will be ran on the Spark cluster; often this will have many transformations, but here we only have one, reading in the data.

By default .show() will display 20 rows and will truncate the columns to a fixed width.

When there are many columns the output can be hard to read, e.g. although the output of .show(3) looks fine in this article, in the notebook version every row will appear over several lines:

rescue.show(3)
+--------------+----------------+-------+-------+---------------+---------+--------------+---------------------+-----------------------+--------------------+-----------------+------------------+--------------------+-----------------+--------------------------+--------------------+---------+--------------------+-----------+-------+-------------+----------------+---------+----------+---------------+----------------+
|IncidentNumber|  DateTimeOfCall|CalYear|FinYear| TypeOfIncident|PumpCount|PumpHoursTotal|HourlyNotionalCost(£)|IncidentNotionalCost(£)|    FinalDescription|AnimalGroupParent|      OriginofCall|        PropertyType| PropertyCategory|SpecialServiceTypeCategory|  SpecialServiceType| WardCode|                Ward|BoroughCode|Borough|StnGroundName|PostcodeDistrict|Easting_m|Northing_m|Easting_rounded|Northing_rounded|
+--------------+----------------+-------+-------+---------------+---------+--------------+---------------------+-----------------------+--------------------+-----------------+------------------+--------------------+-----------------+--------------------------+--------------------+---------+--------------------+-----------+-------+-------------+----------------+---------+----------+---------------+----------------+
|        139091|01/01/2009 03:01|   2009|2008/09|Special Service|      1.0|           2.0|                  255|                  510.0|DOG WITH JAW TRAP...|              Dog|Person (land line)|House - single oc...|         Dwelling|      Other animal assi...|Animal assistance...|E05011467|Crystal Palace & ...|  E09000008|Croydon|      Norbury|            SE19|     null|      null|         532350|          170050|
|        275091|01/01/2009 08:51|   2009|2008/09|Special Service|      1.0|           1.0|                  255|                  255.0|ASSIST RSPCA WITH...|              Fox|Person (land line)|            Railings|Outdoor Structure|      Other animal assi...|Animal assistance...|E05000169|            Woodside|  E09000008|Croydon|     Woodside|            SE25| 534785.0|  167546.0|         534750|          167550|
|       2075091|04/01/2009 10:07|   2009|2008/09|Special Service|      1.0|           1.0|                  255|                  255.0|DOG CAUGHT IN DRA...|              Dog|   Person (mobile)|      Pipe or drain |Outdoor Structure|      Animal rescue fro...|Animal rescue fro...|E05000558|  Carshalton Central|  E09000029| Sutton|   Wallington|             SM5| 528041.0|  164923.0|         528050|          164950|
+--------------+----------------+-------+-------+---------------+---------+--------------+---------------------+-----------------------+--------------------+-----------------+------------------+--------------------+-----------------+--------------------------+--------------------+---------+--------------------+-----------+-------+-------------+----------------+---------+----------+---------------+----------------+
only showing top 3 rows

Convert to pandas: .toPandas()#

The returned results can look pretty ugly with .show() when you have a lot of columns, so often the best way to view the data is to convert to a pandas DataFrame. Be careful: the DataFrame is currently on the Spark cluster with lots of memory capacity, whereas pandas DataFrames are stored on the driver, which will have much less. Trying to use .toPandas() and a huge PySpark DF will not work. If converting to a pandas DF just to view the data, use .limit() to just bring back a small number of rows.

.toPandas() is an action and will process the whole plan on the Spark cluster. In this example, it will read the CSV file, return three rows, and then convert the result to the driver as a pandas DF.

rescue.limit(3).toPandas()
IncidentNumber DateTimeOfCall CalYear FinYear TypeOfIncident PumpCount PumpHoursTotal HourlyNotionalCost(£) IncidentNotionalCost(£) FinalDescription ... WardCode Ward BoroughCode Borough StnGroundName PostcodeDistrict Easting_m Northing_m Easting_rounded Northing_rounded
0 139091 01/01/2009 03:01 2009 2008/09 Special Service 1.0 2.0 255 510.0 DOG WITH JAW TRAPPED IN MAGAZINE RACK,B15 ... E05011467 Crystal Palace & Upper Norwood E09000008 Croydon Norbury SE19 NaN NaN 532350 170050
1 275091 01/01/2009 08:51 2009 2008/09 Special Service 1.0 1.0 255 255.0 ASSIST RSPCA WITH FOX TRAPPED,B15 ... E05000169 Woodside E09000008 Croydon Woodside SE25 534785.0 167546.0 534750 167550
2 2075091 04/01/2009 10:07 2009 2008/09 Special Service 1.0 1.0 255 255.0 DOG CAUGHT IN DRAIN,B15 ... E05000558 Carshalton Central E09000029 Sutton Wallington SM5 528041.0 164923.0 528050 164950

3 rows × 26 columns

See the article on Returning Data from Cluster to Driver for more details.

Select columns .select()#

Often your data will have too many columns that are not relevant, so we can use .select() to just get the ones that are of interest. In our example, we can reduce the number of columns returned so that the output of .show() is much neater.

Selecting columns is a transformation, and so will only be processed once an action is called. As such we are chaining this with .show() to preview the data:

(rescue
    .select("IncidentNumber", "DateTimeofCall", "FinalDescription")
    .show(5, truncate=False))
+--------------+----------------+-----------------------------------------+
|IncidentNumber|DateTimeofCall  |FinalDescription                         |
+--------------+----------------+-----------------------------------------+
|139091        |01/01/2009 03:01|DOG WITH JAW TRAPPED IN MAGAZINE RACK,B15|
|275091        |01/01/2009 08:51|ASSIST RSPCA WITH FOX TRAPPED,B15        |
|2075091       |04/01/2009 10:07|DOG CAUGHT IN DRAIN,B15                  |
|2872091       |05/01/2009 12:27|HORSE TRAPPED IN LAKE,J17                |
|3553091       |06/01/2009 15:23|RABBIT TRAPPED UNDER SOFA,B15            |
+--------------+----------------+-----------------------------------------+
only showing top 5 rows

Get the row count: .count()#

Sometimes you data will be small enough that you do not even need to use Spark. As such it is useful to know the row count, and then make the decision on whether to use Spark or just use pandas.

Note that unlike pandas DataFrames the row count is not automatically determined when the data are read in as a PySpark DataFrame. This is an example of lazy evaluation. As such, .count() is an action and has to be explicitly called.

rescue.count()
5898

Drop columns: .drop()#

.drop() is the opposite of select(); we specify the columns that we want to remove. There are a lot of columns related to the location of the animal rescue incidents that we will not use that can be removed with .drop(). You do not need to specify the columns as a list with [], just use a comma separator.

Note that we have written over the our previous DataFrame by re-assiging to rescue; unlike pandas DFs, PySpark DFs are immutable.

We then use .printSchema() to verify that the columns have been removed.

rescue = rescue.drop(
    "WardCode",
    "BoroughCode",
    "Easting_m",
    "Northing_m",
    "Easting_rounded",
    "Northing_rounded"
)

rescue.printSchema()
root
 |-- IncidentNumber: string (nullable = true)
 |-- DateTimeOfCall: string (nullable = true)
 |-- CalYear: integer (nullable = true)
 |-- FinYear: string (nullable = true)
 |-- TypeOfIncident: string (nullable = true)
 |-- PumpCount: double (nullable = true)
 |-- PumpHoursTotal: double (nullable = true)
 |-- HourlyNotionalCost(£): integer (nullable = true)
 |-- IncidentNotionalCost(£): double (nullable = true)
 |-- FinalDescription: string (nullable = true)
 |-- AnimalGroupParent: string (nullable = true)
 |-- OriginofCall: string (nullable = true)
 |-- PropertyType: string (nullable = true)
 |-- PropertyCategory: string (nullable = true)
 |-- SpecialServiceTypeCategory: string (nullable = true)
 |-- SpecialServiceType: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- StnGroundName: string (nullable = true)
 |-- PostcodeDistrict: string (nullable = true)

Rename columns: .withColumnRenamed()#

The source data has the column names in CamelCase, but when using Python we generally prefer to use snake_case. The source data also has columns containing special characters (£) which can be problematic when writing out data as it is not compatible will all storage formats.

To rename columns, use .withColumnRenamed(). This has two arguments: the original column name, followed by the new name. You need a separate .withColumnRenamed() statement for each column name but can chain these together. Once again we are re-assigning the DataFrame as it is immutable.

rescue = (rescue
          .withColumnRenamed("IncidentNumber", "incident_number")
          .withColumnRenamed("AnimalGroupParent", "animal_group")
          .withColumnRenamed("CalYear", "cal_year")
          .withColumnRenamed("IncidentNotionalCost(£)", "total_cost")
          .withColumnRenamed("PumpHoursTotal", "job_hours")
          .withColumnRenamed("PumpCount", "engine_count"))

rescue.printSchema()
root
 |-- incident_number: string (nullable = true)
 |-- DateTimeOfCall: string (nullable = true)
 |-- cal_year: integer (nullable = true)
 |-- FinYear: string (nullable = true)
 |-- TypeOfIncident: string (nullable = true)
 |-- engine_count: double (nullable = true)
 |-- job_hours: double (nullable = true)
 |-- HourlyNotionalCost(£): integer (nullable = true)
 |-- total_cost: double (nullable = true)
 |-- FinalDescription: string (nullable = true)
 |-- animal_group: string (nullable = true)
 |-- OriginofCall: string (nullable = true)
 |-- PropertyType: string (nullable = true)
 |-- PropertyCategory: string (nullable = true)
 |-- SpecialServiceTypeCategory: string (nullable = true)
 |-- SpecialServiceType: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- StnGroundName: string (nullable = true)
 |-- PostcodeDistrict: string (nullable = true)

Be careful using .withColumnRenamed(); if the column is not in the DataFrame then nothing will happen and an error will not be raised until you try and use the new column name.

Exercise 1#

Exercise 1a#

Rename the following columns in the rescue DataFrame:

FinalDescription –> description

PostcodeDistrict –> postcode_district

Exercise 1b#

Select these columns and the six columns that were renamed in the cell above; you should have eight in total. Reassign the result to the rescue DataFrame.

Exercise 1c#

Preview the first five rows of the DataFrame.

Exercise 1: Solution
# 1a: Chain two .withColumnRenamed() operations
rescue = (rescue
          .withColumnRenamed("FinalDescription", "description")
          .withColumnRenamed("PostcodeDistrict", "postcode_district"))

# 1b: Select the eight columns and assign to rescue
rescue = rescue.select(
    "incident_number",
    "animal_group",
    "cal_year",
    "total_cost",
    "job_hours",
    "engine_count",
    "description",
    "postcode_district")

# 1c Preview with .show(); can also use .limit(5).toPandas()
rescue.show(5)
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+
|incident_number|animal_group|cal_year|total_cost|job_hours|engine_count|         description|postcode_district|
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+
|         139091|         Dog|    2009|     510.0|      2.0|         1.0|DOG WITH JAW TRAP...|             SE19|
|         275091|         Fox|    2009|     255.0|      1.0|         1.0|ASSIST RSPCA WITH...|             SE25|
|        2075091|         Dog|    2009|     255.0|      1.0|         1.0|DOG CAUGHT IN DRA...|              SM5|
|        2872091|       Horse|    2009|     255.0|      1.0|         1.0|HORSE TRAPPED IN ...|              UB9|
|        3553091|      Rabbit|    2009|     255.0|      1.0|         1.0|RABBIT TRAPPED UN...|              RM3|
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+
only showing top 5 rows

Filter rows: .filter() and F.col()#

Rows of PySpark DataFrames can be filtered with .filter(), which takes a logical condition. F.col() is used to reference a column in a DataFrame by name and is the most robust method to use.

For instance, if we want to select all the rows where animal_group is equal to Hamster, we can use F.col("animal_group") == "Hamster"). Note the double equals sign used in a condition. We do not want to change the rescue DataFrame, so assign it to a new DF, hamsters:

hamsters = rescue.filter(F.col("animal_group") == "Hamster")
hamsters.select("incident_number", "animal_group").show(3)
+---------------+------------+
|incident_number|animal_group|
+---------------+------------+
|       37009101|     Hamster|
|       58746101|     Hamster|
|      212716101|     Hamster|
+---------------+------------+
only showing top 3 rows

Alternatively you can just input a string into .filter(), although this can be messy to look at if the condition contains a string:

cats = rescue.filter("animal_group == 'Cat'")
cats.select("incident_number", "animal_group").show(3)
+---------------+------------+
|incident_number|animal_group|
+---------------+------------+
|        5186091|         Cat|
|        5724091|         Cat|
|        5770091|         Cat|
+---------------+------------+
only showing top 3 rows

Multiple conditions should be in brackets; putting each condition on a new line makes the code easier to read:

expensive_olympic_dogs = rescue.filter(
    (F.col("animal_group") == "Dog") &
    (F.col("total_cost") >= 750) &
    (F.col("cal_year") == 2012))

(expensive_olympic_dogs
    .select("incident_number", "animal_group", "cal_year", "total_cost")
    .show())
+---------------+------------+--------+----------+
|incident_number|animal_group|cal_year|total_cost|
+---------------+------------+--------+----------+
|       16209121|         Dog|    2012|     780.0|
|       17531121|         Dog|    2012|     780.0|
|       20818121|         Dog|    2012|     780.0|
|       38636121|         Dog|    2012|     780.0|
|       64764121|         Dog|    2012|    1040.0|
+---------------+------------+--------+----------+

If you want to filter the values in a column to match the values in the list, for example you want to filter for where the animal_group value is cats OR dogs, you can use isin(). This is equivalent to using multiple OR conditions on the same column, but using isin() is a lot cleaner, particularly if you have a large number of values you want to filter by.

#using isin()
cats_and_dogs = rescue.filter(
F.col("animal_group").isin("Cat", "Dog")).limit(5).show()

#using OR
cats_and_dogs = rescue.filter(
    (F.col("animal_group") == "Cat")|
    (F.col("animal_group") == "Dog"))
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+
|incident_number|animal_group|cal_year|total_cost|job_hours|engine_count|         description|postcode_district|
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+
|         139091|         Dog|    2009|     510.0|      2.0|         1.0|DOG WITH JAW TRAP...|             SE19|
|        2075091|         Dog|    2009|     255.0|      1.0|         1.0|DOG CAUGHT IN DRA...|              SM5|
|        4011091|         Dog|    2009|     255.0|      1.0|         1.0|DOG WITH HEAD TRA...|              E11|
|        4211091|         Dog|    2009|     255.0|      1.0|         1.0|LABRADOR FALLEN T...|              E12|
|        4715091|         Dog|    2009|     255.0|      1.0|         1.0|DOG STUCK IN MUD,...|               E5|
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+

If you want to filter for values between a range of values, for example if you’re only interested in incidents that have occurred between 2015 and 2017, or incidents for which the total_cost is between 300 and 500, you can use between(). Note that between() is inclusive of the values in the upper and lower bounds.

Note: you will run into unexpected outputs if you try and use between() with any non-numeric datatypes, so before you use this function, ensure that the column you are filtering by is numeric (such as an integer, double or timestamp datatype) by using dataframe.printSchema().

specific_years = rescue.filter(F.col("cal_year").between(2015, 2017)).limit(5).show()
+---------------+--------------------+--------+----------+---------+------------+--------------------+-----------------+
|incident_number|        animal_group|cal_year|total_cost|job_hours|engine_count|         description|postcode_district|
+---------------+--------------------+--------+----------+---------+------------+--------------------+-----------------+
|         304151|                 Dog|    2015|     295.0|      1.0|         1.0|   TERRIER IN BURROW|              KT8|
|         468151|Unknown - Domesti...|    2015|     295.0|      1.0|         1.0|PERSON STUCK UP TREE|              SE1|
|        1074151|                 Cat|    2015|     295.0|      1.0|         1.0|ASSIST RSPCA WITH...|              EN2|
|        1528151|                 Dog|    2015|     590.0|      2.0|         1.0|2 DOGS STUCK IN Q...|              CR8|
|        2217151|                Bird|    2015|     295.0|      1.0|         1.0|BIRD TRAPPED IN C...|              NW2|
+---------------+--------------------+--------+----------+---------+------------+--------------------+-----------------+

Exercise 2#

Create a new DataFrame which consists of all the rows where animal_group is equal to "Fox" and preview the first ten rows.

Exercise 2: Solution
# Use F.col() to filter, ensuring that a new DataFrame is created
# Can also use a string condition instead, e.g. rescue.filter("animal_group == 'Fox'")
foxes = rescue.filter(F.col("animal_group") == "Fox")

# Preview with .show() or .limit(10).toPandas()
foxes.show(10)
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+
|incident_number|animal_group|cal_year|total_cost|job_hours|engine_count|         description|postcode_district|
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+
|         275091|         Fox|    2009|     255.0|      1.0|         1.0|ASSIST RSPCA WITH...|             SE25|
|       12451091|         Fox|    2009|     765.0|      3.0|         2.0|ASSIST RSPCA WITH...|              SE1|
|       12770091|         Fox|    2009|     255.0|      1.0|         1.0|FOX TRAPPED IN FE...|              KT3|
|       18386091|         Fox|    2009|     255.0|      1.0|         1.0|FOX TRAPPED IN FO...|              NW9|
|       20027091|         Fox|    2009|     255.0|      1.0|         1.0|FOX TRAPPED IN BA...|              W14|
|       30133091|         Fox|    2009|     255.0|      1.0|         1.0|FOX TRAPPED ABOVE...|              SE1|
|       73167091|         Fox|    2009|     260.0|      1.0|         1.0|ASSIST RSPCA WITH...|              RM2|
|       80733091|         Fox|    2009|     780.0|      3.0|         2.0|FOX TRAPPED IN DI...|              E14|
|      105324091|         Fox|    2009|     520.0|      2.0|         1.0|FOX STUCK IN WIND...|              SW7|
|      116595091|         Fox|    2009|     260.0|      1.0|         1.0|FOX TRAPPED IN FENCE|              HA5|
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+
only showing top 10 rows

Adding Columns: .withColumn()#

.withColumn() can be used to either create a new column, or overwrite an existing one. The first argument is the new column name, the second is the value of the column. Often this will be derived from other columns. For instance, we do not have a column for how long an incident took in the data, but do have the columns available to derive this:

  • job_hours gives the total number of hours for engines attending the incident, e.g. if 2 engines attended for an hour job_hours will be 2

  • engine_count gives the number of engines in attendance

So to get the duration of the incident, which we will call incident_duration, we have to divide job_hours by engine_count:

rescue = rescue.withColumn(
    "incident_duration", 
    F.col("job_hours") / F.col("engine_count")
)

Now preview the data with .limit(5).toPandas():

rescue.limit(5).toPandas()
incident_number animal_group cal_year total_cost job_hours engine_count description postcode_district incident_duration
0 139091 Dog 2009 510.0 2.0 1.0 DOG WITH JAW TRAPPED IN MAGAZINE RACK,B15 SE19 2.0
1 275091 Fox 2009 255.0 1.0 1.0 ASSIST RSPCA WITH FOX TRAPPED,B15 SE25 1.0
2 2075091 Dog 2009 255.0 1.0 1.0 DOG CAUGHT IN DRAIN,B15 SM5 1.0
3 2872091 Horse 2009 255.0 1.0 1.0 HORSE TRAPPED IN LAKE,J17 UB9 1.0
4 3553091 Rabbit 2009 255.0 1.0 1.0 RABBIT TRAPPED UNDER SOFA,B15 RM3 1.0

Note that previewing the data took longer to process than defining the new column. Why? Remember that Spark is built on the concept of transformations and actions:

  • Transformations are lazily evaluated expressions. These form the set of instructions called the execution plan.

  • Actions trigger computation to be performed on the cluster and results returned to the driver. It is actions that trigger the execution plan.

Multiple transformations can be combined, as we did to preprocess the rescue DataFrame above. Only when an action is called, for example .toPandas(), .show() or .count(), are these transformations and action executed on the cluster, after which the results are returned to the driver.

Sorting: .orderBy()#

An important Spark concept is that DataFrames are not ordered by default, unlike a pandas DF which has an index. Remember that a Spark DataFrame is distributed into partitions, and there is no guarantee of the order of the rows within these partitions, or which partition a particular row is on.

To sort the data, use .orderBy(). By default this will sort ascending; to sort descending, use ascending=False. To show the highest cost incidents:

(rescue
    .orderBy("total_cost", ascending=False)
    .select("incident_number", "total_cost", "animal_group")
    .show(10))
+---------------+----------+--------------------+
|incident_number|total_cost|        animal_group|
+---------------+----------+--------------------+
|098141-28072016|    3912.0|                 Cat|
|       48360131|    3480.0|               Horse|
|       62700151|    2980.0|               Horse|
|092389-09072018|    2664.0|               Horse|
|       49076141|    2655.0|                 Cat|
|       49189111|    2340.0|               Horse|
|       82423111|    2340.0|               Horse|
|      101755111|    2340.0|                Deer|
|030477-09032018|    2296.0|Unknown - Wild An...|
|028258-08032017|    2282.0|                 Cat|
+---------------+----------+--------------------+
only showing top 10 rows

Horses make up a lot of the more expensive calls, which makes sense, given that they are large animals.

There are actually multiple ways to sort data in PySpark; as well as .orderBy(), you can use .sort() or even an SQL expression. The same is true of sorting the data descending, where F.desc(column_name) can be used instead of ascending=False. The most important principle here is consistency; try and use the same syntax as your colleagues to make the code easier to read.

Note that sorting the DataFrame is an expensive operation, as the rows move between partitions. This is a key Spark concept called a shuffle. When you are ready to optimise your Spark code you will want to read the article on Shuffling.

Exercise 3#

Sort the incidents in terms of their duration, look at the top 10 and the bottom 10. Do you notice anything strange?

Exercise 3: Solution
# To get the top 10, sort the DF descending
top10 = (rescue
         .orderBy("incident_duration", ascending=False)
         .limit(10))

top10.show()

# The bottom 10 can just be sorted ascending
# Note that .tail() does not exist in Spark 2.4
bottom10 = (rescue
         .orderBy("incident_duration")
         .limit(10))

# When previewing the results, the incident_duration are all null
bottom10.show()
+---------------+--------------------+--------+----------+---------+------------+--------------------+-----------------+-----------------+
|incident_number|        animal_group|cal_year|total_cost|job_hours|engine_count|         description|postcode_district|incident_duration|
+---------------+--------------------+--------+----------+---------+------------+--------------------+-----------------+-----------------+
|       48360131|               Horse|    2013|    3480.0|     12.0|         2.0|FOAL IN RIVER WAT...|               E4|              6.0|
|      137525091|               Horse|    2009|    1300.0|      5.0|         1.0|HORSE STUCK IN CANAL|              N18|              5.0|
|       18627122|               Horse|    2012|    1300.0|      5.0|         1.0|HORSE STUCK ON SO...|              EN3|              5.0|
|       92423121|Unknown - Domesti...|    2012|    1300.0|      5.0|         1.0|RUNNING CALL TO S...|              E14|              5.0|
|         955141|               Horse|    2014|    1450.0|      5.0|         1.0|ASSIST POLICE WIT...|               E4|              5.0|
|       62700151|               Horse|    2015|    2980.0|     10.0|         2.0|HORSE FALLEN INTO...|             TN14|              5.0|
|125704-02092018|               Horse|    2018|    1665.0|      5.0|         1.0|HORSE STUCK IN DI...|             DA17|              5.0|
|126939-05092018|               Horse|    2018|    1665.0|      5.0|         1.0|ASSIST RSPCA WITH...|             DA17|              5.0|
|       49076141|                 Cat|    2014|    2655.0|      9.0|         2.0|KITTEN TRAPPED IN...|              TW4|              4.5|
|       64764121|                 Dog|    2012|    1040.0|      4.0|         1.0|DOG TRAPPED IN BA...|              CR5|              4.0|
+---------------+--------------------+--------+----------+---------+------------+--------------------+-----------------+-----------------+

+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+-----------------+
|incident_number|animal_group|cal_year|total_cost|job_hours|engine_count|         description|postcode_district|incident_duration|
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+-----------------+
|       35453121|         Cat|    2012|      null|     null|        null| CAT INJURED UP TREE|              W11|             null|
|        4920141|        Bird|    2014|      null|     null|        null|PIDGEON TRAPPED I...|              SW3|             null|
|       36995121|       Horse|    2012|      null|     null|        null|PONY COLLASPED IN...|              CR0|             null|
|      194986111|       Horse|    2011|      null|     null|        null|HORSE WITH NECK S...|              EN2|             null|
|       51412121|         Cat|    2012|      null|     null|        null|INJURED CAT IN DI...|               E8|             null|
|      208663091|       Horse|    2009|      null|     null|        null|3 HORSES LOOSE ON...|             UB10|             null|
|       18325122|         Dog|    2012|      null|     null|        null|DOG WITH HEAD STU...|              SE6|             null|
|       43265111|         Dog|    2011|      null|     null|        null|DOG IN DISTRESS I...|              SE1|             null|
|        2602131|         Cat|    2013|      null|     null|        null|CAT STUCK BETWEEN...|               W4|             null|
|      118613121|         Cat|    2012|      null|     null|        null|CAT FALLEN INTO R...|             SE16|             null|
+---------------+------------+--------+----------+---------+------------+--------------------+-----------------+-----------------+

Grouping and Aggregating: .groupBy(), .agg() and .alias()#

In most cases, we want to get insights into the raw data, for instance, by taking the sum or average of a column, or getting the largest or smallest values. This is key to what the Office for National Statistics does: we release statistics!

In Spark, grouping and aggregating is similar to pandas or SQL: we first group the data with .groupBy(), then aggregate it in some way with a function from the functions module inside .agg(), e.g. F.sum(), F.max(). For instance, to find the average cost by animal_group we use F.mean():

cost_by_animal = (rescue
                  .groupBy("animal_group")
                  .agg(F.mean("total_cost")))
cost_by_animal.show(5)
+--------------------+------------------+
|        animal_group|   avg(total_cost)|
+--------------------+------------------+
|Unknown - Animal ...| 709.6666666666666|
|                 Cow| 624.1666666666666|
|               Horse| 747.4350649350649|
|             Hamster|311.07142857142856|
|Unknown - Heavy L...|362.54545454545456|
+--------------------+------------------+
only showing top 5 rows

The new column has been returned as avg(total_cost). We want to give it a more sensible name and avoid using brackets. We saw earlier that we could use .withColumnRenamed() once the column has been created, but it is easier to use .alias() to rename the column directly in the aggregation:

cost_by_animal = (rescue
                  .groupBy("animal_group")
                  .agg(F.mean("total_cost").alias("average_cost")))

cost_by_animal.show(5)
+--------------------+------------------+
|        animal_group|      average_cost|
+--------------------+------------------+
|Unknown - Animal ...| 709.6666666666666|
|                 Cow| 624.1666666666666|
|               Horse| 747.4350649350649|
|             Hamster|311.07142857142856|
|Unknown - Heavy L...|362.54545454545456|
+--------------------+------------------+
only showing top 5 rows

Remember that Spark DFs are not ordered unless we specifically do so with .orderBy(); now we have renamed the column average_cost this is easy to do:

cost_by_animal.orderBy("average_cost", ascending=False).show(10)
+--------------------+------------------+
|        animal_group|      average_cost|
+--------------------+------------------+
|                Goat|            1180.0|
|                Bull|             780.0|
|                Fish|             780.0|
|               Horse| 747.4350649350649|
|Unknown - Animal ...| 709.6666666666666|
|                 Cow| 624.1666666666666|
|                Lamb|             520.0|
|            Hedgehog|             520.0|
|                Deer| 423.8829787234043|
|Unknown - Wild An...|390.03636363636366|
+--------------------+------------------+
only showing top 10 rows

It looks like Goat could be an outlier as it is significantly higher than the other higher average cost incidents. We can investigate this in more detail using .filter():

goats = rescue.filter(F.col("animal_group") == "Goat")
goats.count()
1

Just one expensive goat incident! Lets see the description:

goats.select("incident_number", "animal_group", "description").toPandas()
incident_number animal_group description
0 72214141 Goat GOAT TRAPPED BELOW GROUND LEVEL ON LEDGE

Note that we did not use .limit() before .toPandas() here. This is because we know the row count is tiny, and so there was no danger of overloading the driver with too much data.

Reading data from a Parquet file: spark.read.parquet()#

The next section covers how to join data in Spark, but before we do, we need to read in another dataset. In our rescue data, we have a column for the postcode district, which represents the first part of the postcode. We have data for the population by postcode in another dataset, population.

This data are stored as a parquet file. Parquet files the most efficient way to store data when using Spark. They are compressed and so take up much less storage space, and reading parquet files with Spark is many times quicker than reading CSVs. The drawback is that they are not human readable, although you can store them as a Hive table which means they can easily be interrogated with SQL. See the article on Parquet files for more information.

The syntax for reading in a parquet file is similar to a CSV: spark.read.parquet(). There is no need for the header or inferSchema argument as unlike CSVs parquet files already have the schema defined. We can then preview the data with .limit(5).toPandas():

population_path = config["population_path"]
population = spark.read.parquet(population_path)
population.limit(5).toPandas()
postcode_district population
0 DH7 41076
1 NW3 52376
2 NR4 22331
3 SO31 44742
4 CT18 14357

Joining Data .join()#

Now we have read the population data in, we can join it to the rescue data to get the population by postcode. This can be done with the .join() method.

This article assumes that you are familiar with joins. Those who know SQL will be familiar with this term, although pandas and R users sometimes use the term merge. If you do not know how a join works, please read about joins in SQL first; the principles are the same in Spark. Joins are expensive in Spark as they involve shuffling the data and this can make larger joins slow. See the article on Optimising Joins for more information on how to make them more efficient.

.join() is a DataFrame method, so we start with the rescue DataFrame. The other arguments we need are:

  • other: the DataFrame on the right hand side of the join, population;

  • on: which specifies the mapping. Here we have a common column name and so can simply supply the column name;

  • how: the type of join to use, "left" in this case.

To make the code easier to read we have put these arguments on new lines:

rescue_with_pop = (
    rescue.join(
        population,
        on="postcode_district",
        how="left"))

Once again, note how quick this code runs. This is because although a join is an expensive operation, we have only created the plan at this point. We need an action to run the plan and return a result; sort the joined DataFrame, subset the columns and then use .limit(5).toPandas():

rescue_with_pop = (rescue_with_pop
                   .orderBy("incident_number")
                   .select("incident_number", "animal_group", "postcode_district", "population"))

rescue_with_pop.limit(5).toPandas()
incident_number animal_group postcode_district population
0 000014-03092018M Unknown - Heavy Livestock Animal CR8 32307
1 000099-01012017 Dog BR2 44958
2 000260-01012017 Bird CR0 153812
3 000375-01012017 Dog TW8 20330
4 000477-01012017 Deer HA7 36046

Note: a common mistake people make is joining two dataframes with the same column names (other than the column(s) mentioned in the “on” argument). Although this will not result in an error when joining, if you try to do anything with the duplicate columns in the resultant dataframe, such as modifying them using withColumn(), you will receive this error message: pyspark.errors.exceptions.captured.AnalysisException: [AMBIGUOUS_REFERENCE] Reference column_name is ambiguous, could be: [column_name, column_name] as PySpark will be unsure which column you are referring to. If you try to rectify this by dropping the column, both columns will be dropped which could potentially cause confusion.

To avoid this, you can rename duplicate column names by using withColumnRenamed() prior to the join to ensure that there are no duplicate column names across the left and right dataframes.

Writing data: file choice#

In this article so far, we have been calling actions to preview the data, bringing back only a handful of rows each time. This is useful when developing and debugging code, but in production pipelines you will want to write the results.

The format in which the results are written out depends on what you want to do next with the data:

  • If the data are intended to be human readable, e.g. as the basis for a presentation, or as a publication on the ONS website, then you will likely want to output the data as a CSV

  • If the data are intended to be used as an input to another Spark process, then use parquet or a Hive table.

There are other use cases, e.g. JSON can be useful if you want the results to analysed with a different programming language, although here we only focus on CSV and parquet. See the article on Reading and Writing Data for more information.

Write to a parquet: .write.parquet()#

To write out our DataFrame as a parquet file, use rescue_with_pop.write.parquet(), using the file path as the argument.

The key difference between writing out data with Spark and writing out data with pandas is that the data will be distributed, which means that multiple files will be created, stored in a parent folder. Spark can read in these parent folders as one DataFrame. There will be one file written out per partition of the DataFrame.

output_path_parquet = config["rescue_with_pop_path_parquet"]
rescue_with_pop.write.parquet(output_path_parquet)

It is worth looking at the raw data that is written out to see that it has been stored in several files in a parent folder.

When reading the data in, Spark will treat every individual file as a partition. See the article on Managing Partitions for more information.

Write to a CSV: .write.csv() and .coalesce()#

CSVs will also be written out in a distributed manner as multiple files. While this is desirable in a parquet, it is not very useful with CSV, as the main benefit is to make them human readable. First, write out the data with .write.csv(), using the path defined in the config:

output_path_csv = config["rescue_with_pop_path_csv"]
rescue_with_pop.write.csv(output_path_csv, header=True)

Again, look at the raw data in a file browser. You can see that it has written out a folder called rescue_with_pop.csv, with multiple files inside. Each of these on their own is a legitimate CSV file, with the correct headers.

To reduce the number of partitions, use .coalesce(numPartitions); this will combine existing partitions. Setting numPartitions to 1 will put all of the data on the same partition.

As the file will already exist, we need to tell Spark to overwrite the existing file. Use .mode("overwrite") after .write to do this:

(rescue_with_pop
    .coalesce(1)
    .write
    .mode("overwrite")
    .csv(output_path_csv, header=True))

Checking the file again, you can see that although the folder still exists, it will contain only one CSV file.

A neater way of writing out CSV files to a Hadoop file system is with Pydoop, allowing you to convert the Spark DataFrame to pandas first before making use of .to_csv() from pandas, which has many options to control the output format.

Removing files#

Spark has no native way of removing files, so either use the standard Python methods, or delete them manually through a file browser. If on a local file system, use os.remove() to delete a file and shutil.rmtree() to remove a directory. If using HDFS or similar, then use subprocess.run(). Be careful when using the subprocess module as you will not get a warning when deleting files.

import subprocess
for f in [output_path_parquet, output_path_csv]:
    cmd = f"hdfs dfs -rm -r -skipTrash {f}"
    subprocess.run(cmd, shell=True)

Further Resources#

Spark at the ONS Articles:

PySpark Documentation:

Python Documentation

pandas Documentation:

Other Links:

Acknowledgements#

Thanks to Karina Marks, Chris Musselle, Greg Payne and Dave Beech for creating the initial version of this article.