Flags in Spark#
A flag is a column that indicates whether a certain condition, such as a threshold value, is met. The flag is typically a 0 or 1 depending on whether the condition has been met.
Our example will show how you can create a flag for age differences greater than a threshold wihtin a given group. Note: Additional use cases for creating flags will be added to this page at a later date.
Creating an age-difference flag#
Given an group
and age
columns, we want to highlight if the top 2 values in age
column per group
have difference greater than a specified threshold value.
It is likely that there are many ways of doing this. Below is one method using the functions available from pyspark.sql
and some dummy data.
As is good practice, first import the packages you will be using and start your Spark session:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, Window
library(sparklyr)
library(dplyr)
spark = (SparkSession.builder.master("local[2]")
.appName("flags")
.getOrCreate())
default_config <- sparklyr::spark_config()
sc <- sparklyr::spark_connect(
master = "local[2]",
app_name = "flags",
config = default_config)
Creating your dummy data#
The dataframe df
will have an id
column from 0 to 4. To create multiple id
entries we will do a .CrossJoin
with the numbers from 0 to 2 and then drop the latter column. To find more information on cross joins please refer to the page on cross joins. Finally, we will add an age
column with random numbers from 1 to 10.
df = (
spark.range(5).select((F.col("id")).alias("group"))
.crossJoin(
spark.range(3)
.withColumnRenamed("id","drop")
).drop("drop")
.withColumn("age", F.ceil(F.rand(seed=42)*10)))
df.show()
set.seed(42)
df = sparklyr::sdf_seq(sc, 0, 4) %>%
rename("group" = "id") %>%
cross_join(sparklyr::sdf_seq(sc, 0, 2)) %>%
rename("drop" = "id") %>%
select(-c(drop)) %>%
mutate(age = ceil(rand()*10))
print(df,n=15)
+-----+---+
|group|age|
+-----+---+
| 0| 7|
| 0| 9|
| 0| 10|
| 1| 9|
| 1| 5|
| 1| 6|
| 2| 4|
| 2| 3|
| 2| 10|
| 3| 1|
| 3| 4|
| 3| 10|
| 4| 8|
| 4| 7|
| 4| 3|
+-----+---+
# Source: spark<?> [?? x 2]
group age
<int> <dbl>
1 0 7
2 1 1
3 0 3
4 0 6
5 1 5
6 1 5
7 2 1
8 3 1
9 4 2
10 2 8
11 2 7
12 3 5
13 3 10
14 4 3
15 4 4
Creating your columns#
There will be more than one way to get the desired result. Note that the method outlined here spells out the process in detail; please feel free to combine some of the steps (without sacrificing code readability of course!).
Now that we have dummy data we want to create a window
over id
that is ordered by age
in descending order. We then want to create the following columns:
age_lag
showing the next highest age within an idage_diff
the difference betweenage
andage_lag
columnsage_order
numbered oldest to youngesttop_two_age_diff
returns the age difference between two oldest entries within an id, 0 for other rowsage_diff_flag
flag to tell us if the age difference is greater than some threshold, 5 chosen here
The two columns at the end are intermediary, therefore these could be dropped if no longer needed, using .drop
.
order_window = Window.partitionBy("group").orderBy(F.desc("age"))
df = df.withColumn("age_lag", F.lag(F.col("age"), -1).over(order_window))
df = df.withColumn("age_diff", F.col("age") - F.col("age_lag"))
df = df.withColumn("age_order", F.row_number().over(order_window))
df = df.withColumn("top_two_age_diff", F.when((F.col("age_order") == 1), F.col("age_diff")).otherwise(0))
df = df.withColumn("age_diff_flag", F.when(F.col("top_two_age_diff") > 5, 1).otherwise(0))
df.show()
df <- df %>%
group_by(group) %>%
arrange(desc(age)) %>%
mutate(age_lag = lag(age,n = -1)) %>%
mutate(age_diff = age - age_lag) %>%
group_by(group) %>%
mutate(age_order = row_number()) %>%
mutate(top_two_age_diff = ifelse(age_order == 1,
age_diff,
0
)) %>%
mutate(age_diff_flag = ifelse(top_two_age_diff > 5,
1,
0
))
print(df, n = 15)
+-----+---+-------+--------+---------+----------------+-------------+
|group|age|age_lag|age_diff|age_order|top_two_age_diff|age_diff_flag|
+-----+---+-------+--------+---------+----------------+-------------+
| 0| 10| 9| 1| 1| 1| 0|
| 0| 9| 7| 2| 2| 0| 0|
| 0| 7| null| null| 3| 0| 0|
| 1| 9| 6| 3| 1| 3| 0|
| 1| 6| 5| 1| 2| 0| 0|
| 1| 5| null| null| 3| 0| 0|
| 3| 10| 4| 6| 1| 6| 1|
| 3| 4| 1| 3| 2| 0| 0|
| 3| 1| null| null| 3| 0| 0|
| 2| 10| 4| 6| 1| 6| 1|
| 2| 4| 3| 1| 2| 0| 0|
| 2| 3| null| null| 3| 0| 0|
| 4| 8| 7| 1| 1| 1| 0|
| 4| 7| 3| 4| 2| 0| 0|
| 4| 3| null| null| 3| 0| 0|
+-----+---+-------+--------+---------+----------------+-------------+
# Source: spark<?> [?? x 7]
# Groups: group
# Ordered by: desc(age)
group age age_lag age_diff age_order top_two_age_diff age_diff_flag
<int> <dbl> <dbl> <dbl> <int> <dbl> <dbl>
1 1 9 8 1 1 1 0
2 1 8 4 4 2 0 0
3 1 4 NA NA 3 0 0
4 3 10 8 2 1 2 0
5 3 8 5 3 2 0 0
6 3 5 NA NA 3 0 0
7 0 8 8 0 1 0 0
8 0 8 4 4 2 0 0
9 0 4 NA NA 3 0 0
10 2 9 6 3 1 3 0
11 2 6 2 4 2 0 0
12 2 2 NA NA 3 0 0
13 4 6 2 4 1 4 0
14 4 2 1 1 2 0 0
15 4 1 NA NA 3 0 0
As you can see in the table above, a flag column has been created with the values 0 and 1. If the age difference is greater than 5 then the flag = 1, if less than 5 then the flag = 0.
Further Resources#
Spark at the ONS articles: