Groups not Loops#

Most of the time, Spark is better at processing one big job than lots of smaller ones. This is because one large Spark job can be processed in parallel, whereas lots of small ones have to be processed in serial; the second job can only start once the first is completely finished. This means that as a general principle you will want to try and avoid using loops in your code if they contain an action.

To refactor your code to avoid loops, you can often group the data instead. This will mean you are calling one action on a larger DataFrame which will almost certainly be quicker than calling several actions on smaller DataFrames.

Let’s look at two examples, one simple, using groupBy(), and one a little more complex using a window function.

Replace a loop with groupBy()#

First, import all the modules needed and create a Spark session:

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
import pandas as pd

spark = (SparkSession.builder.master("local[2]")
         .appName("groups-not-loops")
         .getOrCreate())

Read in the data and select and rename the columns used in this example:

rescue = (spark
          .read.csv("/training/animal_rescue.csv", header=True, inferSchema=True)
          .withColumnRenamed("AnimalGroupParent", "AnimalGroup")
          .withColumnRenamed("IncidentNotionalCost(£)", "TotalCost")
          .withColumnRenamed("FinalDescription", "Description")
          .select("IncidentNumber", "AnimalGroup", "CalYear", "TotalCost", "Description"))

rescue.limit(5).toPandas()
  IncidentNumber AnimalGroup  CalYear  TotalCost  \
0         139091         Dog     2009      510.0   
1         275091         Fox     2009      255.0   
2        2075091         Dog     2009      255.0   
3        2872091       Horse     2009      255.0   
4        3553091      Rabbit     2009      255.0   

                                 Description  
0  DOG WITH JAW TRAPPED IN MAGAZINE RACK,B15  
1          ASSIST RSPCA WITH FOX TRAPPED,B15  
2                    DOG CAUGHT IN DRAIN,B15  
3                  HORSE TRAPPED IN LAKE,J17  
4              RABBIT TRAPPED UNDER SOFA,B15  

First, we want to get the total number of cats rescued. This is simple to do with .filter() and count():

rescue.filter(F.col("AnimalGroup") == "Cat").count()
2909

Now let’s assume that we want to count some other animals: dogs, hamsters and deer.

The temptation here is to look at our existing code, see that we can easily adapt it to include any animal, and put it in a loop, appending the result to a list each time.

In practice, this is a common way that loops end up being written in Spark code; you have some code which works and a loop is seen as the easiest way to generalise it.

animals = ["Cat", "Dog", "Hamster", "Deer"]

result = []

for animal in animals:
    result.append(rescue.filter(F.col("AnimalGroup") == animal).count())

pdf = pd.DataFrame({"AnimalGroup": animals, "count": result}).sort_values(by=["count"], ascending=False)
pdf
  AnimalGroup  count
0         Cat   2909
1         Dog   1008
3        Deer     94
2     Hamster     14

The issue with this code is the efficiency. .count() is being called four times, meaning that Spark will execute the plan for each animal four separate times. Ideally, we want to call as few actions as possible, as this will make our code much more efficient.

Rather than use a loop, we can instead use filter the data using .isin(), group the data by AnimalGroup, and then count. This will give the same result as above, but by grouping, we are only submitting one action to the cluster, rather than four. This will make the code run much faster, as well as being easier to read.

(rescue
    .filter(F.col("AnimalGroup").isin(animals))
    .groupBy("AnimalGroup")
    .count()
    .orderBy("count", ascending=False)
    .toPandas()
)
  AnimalGroup  count
0         Cat   2909
1         Dog   1008
2        Deer     94
3     Hamster     14

Replace a loop with a window function#

The example above was relatively simple. Sometimes it’s not as straightforward to refactor your loop into a group.

Let’s get the three most expensive cats, dogs, hamsters and deer to rescue. We can write a function which does this, and then call it within a loop:

def get_expensive_incidents(df, animal):
    return(df
        .filter(F.col("AnimalGroup") == animal)
        .select("IncidentNumber", "CalYear", "TotalCost", "Description")
        .orderBy(F.desc("TotalCost"), "IncidentNumber") # use IncidentNumber as a tie-break to ensure determinism
        .limit(3) # just get the top 3
          )
result = []

for animal in animals:
    pdf = get_expensive_incidents(rescue, animal).toPandas()
    pdf["AnimalGroup"] = animal
    result.append(pdf)

pd.concat(result).sort_values(by=["AnimalGroup", "TotalCost"], ascending=[True, False]).reset_index()
    index   IncidentNumber  CalYear  TotalCost  \
0       0  098141-28072016     2016     3912.0   
1       1         49076141     2014     2655.0   
2       2  028258-08032017     2017     2282.0   
3       0        101755111     2011     2340.0   
4       1  144664-11102018     2018     1998.0   
5       2  174801-27122016     2016     1304.0   
6       0        154461131     2013     2030.0   
7       1  098400-17072018     2018     1665.0   
8       2  033179-19032017     2017     1630.0   
9       0  040568-06042016     2016      652.0   
10      1  050586-29042016     2016      326.0   
11      2  099706-31072016     2016      326.0   

                                          Description AnimalGroup  
0    CAT STUCK WITHIN WALL SPACE  RSPCA IN ATTENDANCE         Cat  
1                           KITTEN TRAPPED IN CHIMNEY         Cat  
2                    ASSIST RSPCA WITH CAT IN CHIMNEY         Cat  
3                DEER IN CANAL - WATER RESCUE LEVEL 2        Deer  
4         DEER ENTANGLED - RSPCA ON SCENE 07969305977        Deer  
5   TWO DEER STUCK IN FENCING RVP OUTSIDE PORTLAND...        Deer  
6                       DOG TRAPPED BETWEEN TWO HOUSE         Dog  
7   PUPPY TRAPPED BETWEEN TWO HOUSES  FRU REQ FROM...         Dog  
8   DOG STUCK ON ISLAND IN LARGE POND WATER RESCUE...         Dog  
9                     HAMSTER STUCK UNDER FLOORBOARDS     Hamster  
10                       HAMSTER STUCK IN CAVITY WALL     Hamster  
11                  HAMSTER TRAPPED UNDER FLOORBOARDS     Hamster  

Like in the previous example, this is calling four actions. To re-write this, we can use a window function. The comments in the function should help explain the process, but for a full understanding please look at Using Window Functions for Ranking.

def get_expensive_incidents_grouped(df, animals):
    return(df
        .filter(F.col("AnimalGroup").isin(animals)) # Only include specified animals
        .select("IncidentNumber", "CalYear", "TotalCost", "Description", "AnimalGroup") # Add AnimalGroup to the select
        .withColumn("row_no", F.row_number().over(Window # Rank the animals
                                                .partitionBy("AnimalGroup") # Group by AnimalGroup
                                                .orderBy(F.desc("TotalCost"), "IncidentNumber"))) # Order as before
        .filter(F.col("row_no") <= 3) # Only return the top 3 per group
        .drop("row_no") # Remove the column used in the calculation
          )

(get_expensive_incidents_grouped(rescue, animals)
    .orderBy(["AnimalGroup", "TotalCost"], ascending=[True, False])
    .toPandas()
)
     IncidentNumber  CalYear  TotalCost  \
0   098141-28072016     2016     3912.0   
1          49076141     2014     2655.0   
2   028258-08032017     2017     2282.0   
3         101755111     2011     2340.0   
4   144664-11102018     2018     1998.0   
5   174801-27122016     2016     1304.0   
6         154461131     2013     2030.0   
7   098400-17072018     2018     1665.0   
8   033179-19032017     2017     1630.0   
9   040568-06042016     2016      652.0   
10  050586-29042016     2016      326.0   
11  099706-31072016     2016      326.0   

                                          Description AnimalGroup  
0    CAT STUCK WITHIN WALL SPACE  RSPCA IN ATTENDANCE         Cat  
1                           KITTEN TRAPPED IN CHIMNEY         Cat  
2                    ASSIST RSPCA WITH CAT IN CHIMNEY         Cat  
3                DEER IN CANAL - WATER RESCUE LEVEL 2        Deer  
4         DEER ENTANGLED - RSPCA ON SCENE 07969305977        Deer  
5   TWO DEER STUCK IN FENCING RVP OUTSIDE PORTLAND...        Deer  
6                       DOG TRAPPED BETWEEN TWO HOUSE         Dog  
7   PUPPY TRAPPED BETWEEN TWO HOUSES  FRU REQ FROM...         Dog  
8   DOG STUCK ON ISLAND IN LARGE POND WATER RESCUE...         Dog  
9                     HAMSTER STUCK UNDER FLOORBOARDS     Hamster  
10                       HAMSTER STUCK IN CAVITY WALL     Hamster  
11                  HAMSTER TRAPPED UNDER FLOORBOARDS     Hamster  

We have the same result, but this will run much quicker due to only calling one action.

Where loops aren’t a problem#

If you’re not calling an action, then your loop will just add an extra step to the plan. In this example we are creating a new column each time, which is a transformation, not an action. The single action is .toPandas(), called outside of the loop.

for animal in animals:
    rescue = rescue.withColumn(animal, F.when(F.col("AnimalGroup") == animal, True).otherwise(False))

rescue.orderBy("IncidentNumber").limit(5).toPandas()
     IncidentNumber                       AnimalGroup  CalYear  TotalCost  \
0  000014-03092018M  Unknown - Heavy Livestock Animal     2018      999.0   
1   000099-01012017                               Dog     2017      652.0   
2   000260-01012017                              Bird     2017      326.0   
3   000375-01012017                               Dog     2017      652.0   
4   000477-01012017                              Deer     2017      326.0   

                                         Description    Cat    Dog  Hamster  \
0                                               None  False  False    False   
1    DOG WITH HEAD STUCK IN RAILINGS CALLED BY OWNER  False   True    False   
2  BIRD TRAPPED IN NETTING BY THE 02 SHOP AND NEA...  False  False    False   
3  DOG STUCK IN HULL OF DERELICT BOAT - WATER RES...  False   True    False   
4  DEER TRAPPED IN RAILINGS JUNCTION WITH DENNIS ...  False  False    False   

    Deer  
0  False  
1  False  
2  False  
3  False  
4   True  

If you can find a more Pythonic way to write your loops that don’t call actions then that is recommended, but it is less of an issue for efficiency.

Finally: testing!#

When refactoring your code always make sure you use unit tests or a regression test to make sure that the functionality of the code is preserved. It’s far better to have slow code which works than quick code which doesn’t!

Further Resources#

Documentation: