Groups not Loops#
Most of the time, Spark is better at processing one big job than lots of smaller ones. This is because one large Spark job can be processed in parallel, whereas lots of small ones have to be processed in serial; the second job can only start once the first is completely finished. This means that as a general principle you will want to try and avoid using loops in your code if they contain an action.
To refactor your code to avoid loops, you can often group the data instead. This will mean you are calling one action on a larger DataFrame which will almost certainly be quicker than calling several actions on smaller DataFrames.
Let’s look at two examples, one simple, using groupBy()
, and one a little more complex using a window function.
Replace a loop with groupBy()
#
First, import all the modules needed and create a Spark session:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
import pandas as pd
spark = (SparkSession.builder.master("local[2]")
.appName("groups-not-loops")
.getOrCreate())
library(sparklyr)
library(dplyr)
sc <- sparklyr::spark_connect(
master = "local[2]",
app_name = "groups-not-loops",
config = sparklyr::spark_config(),
)
Read in the data and select and rename the columns used in this example:
rescue = (spark
.read.csv("/training/animal_rescue.csv", header=True, inferSchema=True)
.withColumnRenamed("AnimalGroupParent", "AnimalGroup")
.withColumnRenamed("IncidentNotionalCost(£)", "TotalCost")
.withColumnRenamed("FinalDescription", "Description")
.select("IncidentNumber", "AnimalGroup", "CalYear", "TotalCost", "Description"))
rescue.limit(5).toPandas()
rescue <- sparklyr::spark_read_csv(
sc,
path = "/training/animal_rescue.csv",
header = TRUE,
infer_schema = TRUE) %>%
sparklyr::select(
AnimalGroup = AnimalGroupParent,
TotalCost = IncidentNotionalCostGBP,
Description = FinalDescription,
CalYear = CalYear,
IncidentNumber = IncidentNumber)
head(rescue) %>%
sparklyr::collect() %>%
print()
IncidentNumber AnimalGroup CalYear TotalCost \
0 139091 Dog 2009 510.0
1 275091 Fox 2009 255.0
2 2075091 Dog 2009 255.0
3 2872091 Horse 2009 255.0
4 3553091 Rabbit 2009 255.0
Description
0 DOG WITH JAW TRAPPED IN MAGAZINE RACK,B15
1 ASSIST RSPCA WITH FOX TRAPPED,B15
2 DOG CAUGHT IN DRAIN,B15
3 HORSE TRAPPED IN LAKE,J17
4 RABBIT TRAPPED UNDER SOFA,B15
# A tibble: 6 × 5
AnimalGroup TotalCost Description CalYear IncidentNumber
<chr> <dbl> <chr> <int> <chr>
1 Dog 510 DOG WITH JA… 2009 139091
2 Fox 255 ASSIST RSPC… 2009 275091
3 Dog 255 DOG CAUGHT … 2009 2075091
4 Horse 255 HORSE TRAPP… 2009 2872091
5 Rabbit 255 RABBIT TRAP… 2009 3553091
6 Unknown - Heavy Livestock Animal 255 ANIMAL TRAP… 2009 3742091
First, we want to get the total number of cats rescued. This is simple to do with .filter()
and count()
:
rescue.filter(F.col("AnimalGroup") == "Cat").count()
rescue %>%
sparklyr::filter(AnimalGroup == "Cat") %>%
count() %>%
print()
2909
# Source: spark<?> [?? x 1]
n
<dbl>
1 2909
Now let’s assume that we want to count some other animals: dogs, hamsters and deer.
The temptation here is to look at our existing code, see that we can easily adapt it to include any animal, and put it in a loop, appending the result to a list each time.
In practice, this is a common way that loops end up being written in Spark code; you have some code which works and a loop is seen as the easiest way to generalise it.
animals = ["Cat", "Dog", "Hamster", "Deer"]
result = []
for animal in animals:
result.append(rescue.filter(F.col("AnimalGroup") == animal).count())
pdf = pd.DataFrame({"AnimalGroup": animals, "count": result}).sort_values(by=["count"], ascending=False)
pdf
animals = c("Cat", "Dog", "Hamster", "Deer")
result = list()
for (animal in animals) {
count <- rescue %>%
sparklyr::filter(AnimalGroup == animal) %>%
count() %>%
sparklyr::collect() %>%
as.numeric()
result <- append(result, count)
}
animal_counts <- data.frame(AnimalGroup = animals, count = unlist(result))
head(animal_counts)
AnimalGroup count
0 Cat 2909
1 Dog 1008
3 Deer 94
2 Hamster 14
AnimalGroup count
1 Cat 2909
2 Dog 1008
3 Hamster 14
4 Deer 94
The issue with this code is the efficiency. .count()
is being called four times, meaning that Spark will execute the plan for each animal four separate times. Ideally, we want to call as few actions as possible, as this will make our code much more efficient.
Rather than use a loop, we can instead use filter the data using .isin()
, group the data by AnimalGroup
, and then count. This will give the same result as above, but by grouping, we are only submitting one action to the cluster, rather than four. This will make the code run much faster, as well as being easier to read.
(rescue
.filter(F.col("AnimalGroup").isin(animals))
.groupBy("AnimalGroup")
.count()
.orderBy("count", ascending=False)
.toPandas()
)
rescue %>%
sparklyr::filter(AnimalGroup %in% animals) %>%
dplyr::group_by(AnimalGroup) %>%
dplyr::summarise(count = n()) %>%
dplyr::arrange(desc(count)) %>%
sparklyr::collect() %>%
print()
AnimalGroup count
0 Cat 2909
1 Dog 1008
2 Deer 94
3 Hamster 14
# A tibble: 4 × 2
AnimalGroup count
<chr> <dbl>
1 Cat 2909
2 Dog 1008
3 Deer 94
4 Hamster 14
Replace a loop with a window function#
The example above was relatively simple. Sometimes it’s not as straightforward to refactor your loop into a group.
Let’s get the three most expensive cats, dogs, hamsters and deer to rescue. We can write a function which does this, and then call it within a loop:
def get_expensive_incidents(df, animal):
return(df
.filter(F.col("AnimalGroup") == animal)
.select("IncidentNumber", "CalYear", "TotalCost", "Description")
.orderBy(F.desc("TotalCost"), "IncidentNumber") # use IncidentNumber as a tie-break to ensure determinism
.limit(3) # just get the top 3
)
get_expensive_incidents <- function(df, animal) {
df %>%
sparklyr::filter(AnimalGroup == animal) %>%
sparklyr::select(IncidentNumber, CalYear, TotalCost, Description, AnimalGroup) %>%
dplyr::arrange(desc(TotalCost), IncidentNumber) %>%
head(3)
}
result = []
for animal in animals:
pdf = get_expensive_incidents(rescue, animal).toPandas()
pdf["AnimalGroup"] = animal
result.append(pdf)
pd.concat(result).sort_values(by=["AnimalGroup", "TotalCost"], ascending=[True, False]).reset_index()
result <- list() #create an empty list to store out results from the loop
for (animal in animals) {
expensive_animal <- get_expensive_incidents(rescue, animal) #apply our function to each animal
expensive_animal$AnimalGroup <- animal
result <- append(result, list(expensive_animal)) #add result to list
}
expensive_rescues <- do.call(rbind, result) %>% #convert list into dataframe
dplyr::arrange(AnimalGroup, desc(TotalCost))
expensive_rescues %>%
collect() %>%
print()
index IncidentNumber CalYear TotalCost \
0 0 098141-28072016 2016 3912.0
1 1 49076141 2014 2655.0
2 2 028258-08032017 2017 2282.0
3 0 101755111 2011 2340.0
4 1 144664-11102018 2018 1998.0
5 2 174801-27122016 2016 1304.0
6 0 154461131 2013 2030.0
7 1 098400-17072018 2018 1665.0
8 2 033179-19032017 2017 1630.0
9 0 040568-06042016 2016 652.0
10 1 050586-29042016 2016 326.0
11 2 099706-31072016 2016 326.0
Description AnimalGroup
0 CAT STUCK WITHIN WALL SPACE RSPCA IN ATTENDANCE Cat
1 KITTEN TRAPPED IN CHIMNEY Cat
2 ASSIST RSPCA WITH CAT IN CHIMNEY Cat
3 DEER IN CANAL - WATER RESCUE LEVEL 2 Deer
4 DEER ENTANGLED - RSPCA ON SCENE 07969305977 Deer
5 TWO DEER STUCK IN FENCING RVP OUTSIDE PORTLAND... Deer
6 DOG TRAPPED BETWEEN TWO HOUSE Dog
7 PUPPY TRAPPED BETWEEN TWO HOUSES FRU REQ FROM... Dog
8 DOG STUCK ON ISLAND IN LARGE POND WATER RESCUE... Dog
9 HAMSTER STUCK UNDER FLOORBOARDS Hamster
10 HAMSTER STUCK IN CAVITY WALL Hamster
11 HAMSTER TRAPPED UNDER FLOORBOARDS Hamster
# A tibble: 12 × 5
IncidentNumber CalYear TotalCost Description AnimalGroup
<chr> <int> <dbl> <chr> <chr>
1 098141-28072016 2016 3912 CAT STUCK WITHIN WALL SPACE R… Cat
2 49076141 2014 2655 KITTEN TRAPPED IN CHIMNEY Cat
3 028258-08032017 2017 2282 ASSIST RSPCA WITH CAT IN CHIMN… Cat
4 101755111 2011 2340 DEER IN CANAL - WATER RESCUE L… Deer
5 144664-11102018 2018 1998 DEER ENTANGLED - RSPCA ON SCEN… Deer
6 174801-27122016 2016 1304 TWO DEER STUCK IN FENCING RVP … Deer
7 154461131 2013 2030 DOG TRAPPED BETWEEN TWO HOUSE Dog
8 098400-17072018 2018 1665 PUPPY TRAPPED BETWEEN TWO HOUS… Dog
9 033179-19032017 2017 1630 DOG STUCK ON ISLAND IN LARGE P… Dog
10 040568-06042016 2016 652 HAMSTER STUCK UNDER FLOORBOARDS Hamster
11 050586-29042016 2016 326 HAMSTER STUCK IN CAVITY WALL Hamster
12 099706-31072016 2016 326 HAMSTER TRAPPED UNDER FLOORBOA… Hamster
Like in the previous example, this is calling four actions. To re-write this, we can use a window function. The comments in the function should help explain the process, but for a full understanding please look at Using Window Functions for Ranking.
def get_expensive_incidents_grouped(df, animals):
return(df
.filter(F.col("AnimalGroup").isin(animals)) # Only include specified animals
.select("IncidentNumber", "CalYear", "TotalCost", "Description", "AnimalGroup") # Add AnimalGroup to the select
.withColumn("row_no", F.row_number().over(Window # Rank the animals
.partitionBy("AnimalGroup") # Group by AnimalGroup
.orderBy(F.desc("TotalCost"), "IncidentNumber"))) # Order as before
.filter(F.col("row_no") <= 3) # Only return the top 3 per group
.drop("row_no") # Remove the column used in the calculation
)
(get_expensive_incidents_grouped(rescue, animals)
.orderBy(["AnimalGroup", "TotalCost"], ascending=[True, False])
.toPandas()
)
get_expensive_incidents_grouped <- function(df, animals){
df %>%
sparklyr::filter(AnimalGroup %in% animals) %>%
sparklyr::select(IncidentNumber, CalYear, TotalCost, Description, AnimalGroup) %>%
dplyr::group_by(AnimalGroup) %>%
dplyr::arrange(desc(TotalCost)) %>%
sparklyr::filter(row_number() <= 3) %>%
dplyr::ungroup() %>%
dplyr::arrange(AnimalGroup, desc(TotalCost), IncidentNumber)
}
expensive_rescues <- get_expensive_incidents_grouped(rescue, animals)
expensive_rescues %>%
collect() %>%
print()
IncidentNumber CalYear TotalCost \
0 098141-28072016 2016 3912.0
1 49076141 2014 2655.0
2 028258-08032017 2017 2282.0
3 101755111 2011 2340.0
4 144664-11102018 2018 1998.0
5 174801-27122016 2016 1304.0
6 154461131 2013 2030.0
7 098400-17072018 2018 1665.0
8 033179-19032017 2017 1630.0
9 040568-06042016 2016 652.0
10 050586-29042016 2016 326.0
11 099706-31072016 2016 326.0
Description AnimalGroup
0 CAT STUCK WITHIN WALL SPACE RSPCA IN ATTENDANCE Cat
1 KITTEN TRAPPED IN CHIMNEY Cat
2 ASSIST RSPCA WITH CAT IN CHIMNEY Cat
3 DEER IN CANAL - WATER RESCUE LEVEL 2 Deer
4 DEER ENTANGLED - RSPCA ON SCENE 07969305977 Deer
5 TWO DEER STUCK IN FENCING RVP OUTSIDE PORTLAND... Deer
6 DOG TRAPPED BETWEEN TWO HOUSE Dog
7 PUPPY TRAPPED BETWEEN TWO HOUSES FRU REQ FROM... Dog
8 DOG STUCK ON ISLAND IN LARGE POND WATER RESCUE... Dog
9 HAMSTER STUCK UNDER FLOORBOARDS Hamster
10 HAMSTER STUCK IN CAVITY WALL Hamster
11 HAMSTER TRAPPED UNDER FLOORBOARDS Hamster
# A tibble: 12 × 5
IncidentNumber CalYear TotalCost Description AnimalGroup
<chr> <int> <dbl> <chr> <chr>
1 098141-28072016 2016 3912 CAT STUCK WITHIN WALL SPACE R… Cat
2 49076141 2014 2655 KITTEN TRAPPED IN CHIMNEY Cat
3 028258-08032017 2017 2282 ASSIST RSPCA WITH CAT IN CHIMN… Cat
4 101755111 2011 2340 DEER IN CANAL - WATER RESCUE L… Deer
5 144664-11102018 2018 1998 DEER ENTANGLED - RSPCA ON SCEN… Deer
6 174801-27122016 2016 1304 TWO DEER STUCK IN FENCING RVP … Deer
7 154461131 2013 2030 DOG TRAPPED BETWEEN TWO HOUSE Dog
8 098400-17072018 2018 1665 PUPPY TRAPPED BETWEEN TWO HOUS… Dog
9 033179-19032017 2017 1630 DOG STUCK ON ISLAND IN LARGE P… Dog
10 040568-06042016 2016 652 HAMSTER STUCK UNDER FLOORBOARDS Hamster
11 050586-29042016 2016 326 HAMSTER STUCK IN CAVITY WALL Hamster
12 099706-31072016 2016 326 HAMSTER TRAPPED UNDER FLOORBOA… Hamster
We have the same result, but this will run much quicker due to only calling one action.
Where loops aren’t a problem#
If you’re not calling an action, then your loop will just add an extra step to the plan. In this example we are creating a new column each time, which is a transformation, not an action. The single action is .toPandas()
, called outside of the loop.
for animal in animals:
rescue = rescue.withColumn(animal, F.when(F.col("AnimalGroup") == animal, True).otherwise(False))
rescue.orderBy("IncidentNumber").limit(5).toPandas()
for (animal in animals) {
rescue <- rescue %>%
sparklyr::mutate(!!animal := ifelse(
AnimalGroup == animal, TRUE, FALSE))
}
rescue %>%
dplyr::arrange(IncidentNumber) %>%
head(5) %>%
collect() %>%
print()
IncidentNumber AnimalGroup CalYear TotalCost \
0 000014-03092018M Unknown - Heavy Livestock Animal 2018 999.0
1 000099-01012017 Dog 2017 652.0
2 000260-01012017 Bird 2017 326.0
3 000375-01012017 Dog 2017 652.0
4 000477-01012017 Deer 2017 326.0
Description Cat Dog Hamster \
0 None False False False
1 DOG WITH HEAD STUCK IN RAILINGS CALLED BY OWNER False True False
2 BIRD TRAPPED IN NETTING BY THE 02 SHOP AND NEA... False False False
3 DOG STUCK IN HULL OF DERELICT BOAT - WATER RES... False True False
4 DEER TRAPPED IN RAILINGS JUNCTION WITH DENNIS ... False False False
Deer
0 False
1 False
2 False
3 False
4 True
# A tibble: 5 × 9
AnimalGroup TotalCost Description CalYear IncidentNumber Cat Dog Hamster
<chr> <dbl> <chr> <int> <chr> <lgl> <lgl> <lgl>
1 Unknown - He… 999 <NA> 2018 000014-030920… FALSE FALSE FALSE
2 Dog 652 DOG WITH H… 2017 000099-010120… FALSE TRUE FALSE
3 Bird 326 BIRD TRAPP… 2017 000260-010120… FALSE FALSE FALSE
4 Dog 652 DOG STUCK … 2017 000375-010120… FALSE TRUE FALSE
5 Deer 326 DEER TRAPP… 2017 000477-010120… FALSE FALSE FALSE
# ℹ 1 more variable: Deer <lgl>
If you can find a more Pythonic way to write your loops that don’t call actions then that is recommended, but it is less of an issue for efficiency.
Finally: testing!#
When refactoring your code always make sure you use unit tests or a regression test to make sure that the functionality of the code is preserved. It’s far better to have slow code which works than quick code which doesn’t!
Further Resources#
Documentation: