Median in Spark#

Introduction#

When working with big data, simple operations like computing the median can have significant computational costs associated with them. In Spark, estimation strategies are employed to compute medians and other quantiles, with the accuracy of the estimation being balanced against the computational cost.

First we will set-up a Spark session and read in the config file.

from pyspark.sql import SparkSession, functions as F
import yaml

spark = (SparkSession.builder.master("local[2]")
         .appName("ons-spark")
         .getOrCreate())

with open("../../../config.yaml") as f:
    config = yaml.safe_load(f)

Read in population dataset

pop_df = spark.read.parquet(config["population_path"])

pop_df.printSchema()
root
 |-- postcode_district: string (nullable = true)
 |-- population: long (nullable = true)

Computing the median for a Spark DataFrame#

We can compute the medians for big data in Spark using the Greenwald-Khanna algorithm that approximates a given quantile or quantiles of a distribution where the number of observations is very large. The relative error of the function can be adjusted to give a more accurate estimate for a given quantile at the cost of increased computation.

In PySpark, the Greenwald-Khanna algorithm is implemented with approxQuantile, which extends pyspark.sql.DataFrame. To find the exact median of the population column with PySpark, we apply the approxQuantile to our population DataFrame and specify the column name, an array containing the quantile of interest (in this case, the median or second quartile, 0.5), and the relative error, which is set to 0 to give the exact median.

In SparklyR, the Greenwald-Khanna algorithm is implemented with sdf_quantile. To find the exact median of the population column with SparklyR, we apply sdf_quantile to our popuation DataFrame, specifying the same parameters as in the PySpark example - an array containing the quantile to compute (the median, 0.5) and the relative error (0).

pop_df.approxQuantile("population", [0.5], 0)
[22331.0]

In the next example, we will compute the 1st, 2nd, and 3rd quartiles of the population column, which we will do by passing an array containing the numeric values of the three quartiles as our second argument (0.25, 0.5 and 0.75, respectively). We will assume that computing three quantiles exactly would be too computationally expensive given our available resources, so we will increase the relative error parameter to reduce the accuracy of our estimates in return for decreased computation cost. We will increase the relative error parameter to 0.01, or 1%.

pop_df.approxQuantile("population", [0.25, 0.5, 0.75], 0.01)
[12051.0, 22059.0, 33495.0]

What is the relationship between the relative error parameter and the accuracy of the estimated result?

exact = pop_df.approxQuantile("population", [0.5], 0)[0]
estimate = pop_df.approxQuantile("population", [0.5], 0.01)[0]

round((exact - estimate)/exact, 3)
0.012

By specificying a relative error of 1%, we can see that the difference between the estimated median for the population column and the exact median is approximately 1%.

Computing the median for aggregated data#

Usually when performing data analysis you will want to find the median of aggregations created from your dataset, rather than just computing the median of entire columns. We will first read in the borough and postcode information from the animal rescue dataset and join these with the population dataset.

borough_df = spark.read.parquet(config["rescue_clean_path"])

borough_df = borough_df.select("borough", F.upper(F.col("postcodedistrict")).alias("postcode_district"))

pop_borough_df = borough_df.join(
    pop_df,
    on = "postcode_district",
    how = "left"
).filter(F.col("borough").isNotNull())

pop_borough_df.show(10)
+-----------------+--------------------+----------+
|postcode_district|             borough|population|
+-----------------+--------------------+----------+
|             SE19|             Croydon|     27639|
|             SE25|             Croydon|     34521|
|              SM5|              Sutton|     38291|
|              UB9|          Hillingdon|     14336|
|              RM3|            Havering|     40272|
|             RM10|Barking And Dagenham|     38157|
|              E11|      Waltham Forest|     55128|
|              E12|           Redbridge|     41869|
|              CR0|             Croydon|    153812|
|               E5|             Hackney|     47669|
+-----------------+--------------------+----------+
only showing top 10 rows

Next, we will aggregate the population data across boroughs in the combined pop_borough_df and find the median postcode population for each borough.

The DAP environment is currently limited to using PySpark version 2.4.0, but there are two functions present in later versions that are useful for calculating quantiles/medians.

  • In PySpark 3.1.0, percentile_approx was added to pyspark.sql.functions, which allows you to calculate quantiles on aggregatations. The parameters required in percentile_approx are similar to that of approxQuantile, with the difference being that you must provide the accuracy instead of relative error, where accuracy is defined as \( \frac{1}{relative\ error} \)

pop_borough_df.groupBy("borough").agg(
        percentile_approx("population", [0.5], 100000).alias("median_postcode_population")
).show()
  • In PySpark 3.4.0, median was added to pyspark.sql.functions, which further simplifies the process of computing the median within aggregations, as it does not require a parameter specifying the quantile, or an accuracy parameter.

pop_borough_df.groupBy("borough").agg(
        median("population").alias("median_postcode_population")
).show()

In PySpark 2.4.0, to calculate quantiles inside aggregations we can use the built-in Spark SQL approx_percentile function by passing SQL code to the PySpark API as a string inside of an expr. The parameters of the SQL function are identical to the PySpark percentile_approx function, however we must give the list of quantiles we wish to calculate inside of a SQL array().

In SparklyR, we are able to apply the percentile_approx function to an aggregation, inside of a summarise function. The arguments passed to percentile_approx in SparklyR are identical to the PySpark implementation of percentile_approx.

pop_borough_df.groupBy("borough").agg(
        F.expr('approx_percentile(population, array(0.5), 100000)').alias("median_postcode_population")
).show()
+--------------------+--------------------------+
|             borough|median_postcode_population|
+--------------------+--------------------------+
|       Epping Forest|                   [23599]|
|             Croydon|                   [48428]|
|          Wandsworth|                   [64566]|
|              Bexley|                   [34767]|
|Kingston Upon Thames|                   [31384]|
|             Lambeth|                   [43845]|
|Kensington And Ch...|                   [22381]|
|              Camden|                   [60910]|
|           Brentwood|                   [24715]|
|           Greenwich|                   [63082]|
|Barking And Dagenham|                   [38157]|
|              Newham|                   [52244]|
|       Tower Hamlets|                   [69523]|
|              Barnet|                   [29882]|
|            Hounslow|                   [36147]|
|              Harrow|                   [44781]|
|      City Of London|                      [39]|
|           Islington|                   [47204]|
|               Brent|                   [67621]|
|            Haringey|                   [43025]|
+--------------------+--------------------------+
only showing top 20 rows

Close the Spark session to prevent any hanging processes.

spark.stop()

The relationship between relative error and computational cost#

The Greenwald-Khanna algorithm has a worst-case space requirement of \( O(\frac{1}{\epsilon}log(\epsilon N)) \), where \( \epsilon \) is the relative error and \( N \) is the size of our input data. Given a constant input size, empirical measurements show that increasing the precision of \(\epsilon\) by a factor of 10 from 0.1 to 0.01 leads to a space requirement increase of between 8 and 10 times, depending on the size of the input data. Increasing the precision of \(\epsilon\) by a factor of 10 from 0.01 to 0.001 leads to a smaller space requirement increase of between 7 and 9 times. For most Spark applications, you will be able to able to specify a high degree of precision in calculating quantiles using implementations of the Greenwald-Khanna algorithm due to the logarithmic relationship between relative error and space requirements.

Further reading#