R UDFs#

When coding with Spark, you will generally want to try and use sparklyr or Spark SQL functions wherever possible. However, there are instances where there may not be a Spark function available to do what you need. In cases such as this, one option is to use an R user-defined function (UDF).

The sparklyr package comes with the spark_apply() function, which is designed for this purpose. It allows you to apply a function written in R to a Spark object (e.g. a Spark DataFrame). In order for this to work, you need to ensure that your cluster administrator, cloud provider, or you (in the case of running Spark locally) has configured your cluster by installing either:

  • R on every node

  • Apache Arrow on every node (requires Spark 2.3 or later)

Although only one of these is required to make use of spark_apply(), in practice, it is recommended to use spark_apply() with Apache Arrow, as it provides significant performance improvements. In general, R UDFs should be considered a ‘last resort’, where there is no equivalent functionality compatible with Spark and an R function needs to be run on a large Spark DataFrame. They are generally very inefficient due to the need to serialise and deserialise the data between Spark and R. Apache Arrow speeds this up and allows data to be transferred more efficiently between Spark and R, but even with these improvements spark_apply() is still best avoided if an alternative can be found. For a detailed explanation of why spark_apply() is more efficient with Apache Arrow, see the Distributed R chapter of Mastering Spark with R.

It is sometimes not possible to avoid using a UDF. For example, if we want to use a specialised statistical R package on our data that has no Spark equivalent. The examples below demonstrate how to use spark_apply(). They are written assuming that the user does not have Apache Arrow available, but if your configuration enables the use of Arrow the examples can be adapted by making sure you have the arrow package installed (install.packages("arrow")) and simply loading the arrow library (library(arrow)) before establishing the spark connection.

Example 1: Simple UDF and loading packages onto worker nodes#

This example represents a simple UDF just to demonstrate how spark_apply() can be used. In practice, it is far too simple to necessitate using a UDF - we could definitely do this with sparklyr code instead of R code!

The example UDF below will make use of the mutate() function from he dplyr package. In order for Spark to run our UDF, we need to make sure that the packages we need are installed on each worker node in our Spark cluster once we have set up our session. To do this, we need to pass the location of our R package library to the Spark cluster by including the spark.r.libpaths setting as shown below:

library(dplyr)
library(sparklyr)

# Set up our Spark configuration - including our library path
default_config <- spark_config()
default_config["spark.r.libpaths"] <- .libPaths()[1]

sc <- spark_connect(
  master = "local[2]",
  app_name = "r-udfs",
  config = default_config)
  

You can replace .libPaths()[1] with the full path to your main R package library in quotation marks if you need to. If you are unsure where this is, running .libPaths() in your session console will show you a list of library locations that R searches for installed packages.

We have told Spark where to look for R packages, but they are not yet installed on the worker nodes. The first time we run spark_apply() in our Spark session, any packages found in the specified library path will be copied over to the Spark cluster. As a result, the most efficient way to load packages onto the cluster is to run a ‘dummy’ UDF on a small dataset before running our actual function. This ensures that the packages are loaded and ready for use before Spark attempts to perform more complex operations on our data. We can define and run our dummy UDF as shown:

# Define dummy UDF 'libload' which loads our required function libraries
# You could just define an empty function here, but this forces Spark to output 
# a list of libraries loaded on the cluster so we can see it has worked 

libload <- function() {
  library(dplyr)
}

# All packages will be loaded on to worker nodes during first call to spark_apply in session
# so it is more efficient to do this 'on' a minimal sdf (of length 1) first

sdf_len(sc, 1) |> sparklyr::spark_apply(f = libload,
                packages = FALSE)

Confusingly, to load packages using this method, note that we have had to set the packages argument in spark_apply() to FALSE. This is because there is another method to load packages on to the cluster, using spark_apply_bundle(). This function is designed to bundle all packages into a .tar file ready to pass to the Spark cluster. Setting the packages argument to TRUE prompts Spark to search for this .tar file and extract the relevant packages. However, this method is not easily generalisable to different system setups and may not always be possible using a given configuration (e.g. it does not seem to work with S3 bucket storage). For this reason, this guide demonstrates using the alternative approach outlined above to use packages with spark_apply().

Now we have loaded our R packages on to the cluster, we can set up and run our UDF. For this example we can define a dummy Spark dataframe to work with:

# Set up a dummy Spark dataframe
sdf <- sparklyr:::sdf_seq(sc, -7, 8, 2) |>
       sparklyr::mutate(half_id = id / 2) |>
       sparklyr::select(half_id)

# View the data    
sdf |>
    sparklyr::collect() |>
    print()

Now we need to set up our UDF. R UDFs are defined using the same syntax as regular R functions. However, you may need to ensure that any calls to functions from loaded R packages are written as <package_name>::<package_function> to ensure that Spark can find the specified function (e.g. dplyr::mutate() in the example below):

round_udf <- function(df) {
    x <- df |>
        dplyr::mutate(rounded_col = round(half_id)) # need to specify dplyr package
    return(x)
}

This defines a simple function to create a new column rounded_col by rounding the half_id column in our Spark dataframe. Of course in reality, we would never need to use a UDF for something so simple as this can be fully done in Spark, but this serves as a simplified example of how R UDFs can be used. We are now ready to use spark_apply to apply the round_udf() function to our dataframe:

rounded <- sparklyr::spark_apply(sdf,
   f = round_udf,
   # Specify schema - works faster if you specify a schema
   columns = c(half_id = "double",
               rounded_col = "double"),
   packages = FALSE)

# View the resulting Spark dataframe
rounded

# disconnect the Spark session
spark_disconnect(sc)

Note that we have included a columns argument in the above example to enable us to specify a schema for the dataframe. It is a good idea to do this where possible as it speeds up the running of the UDF. If no schema is specified, Spark will need to identify it before applying the UDF which can slow things down somewhat.

The above example can easily be adapted for any R function which takes a single argument (the dataframe). However, most functions will require additional arguments to be passed in. We will look at how to do this with the next example using the context argument of spark_apply.

Example 2: Passing additional arguments to a UDF#

First, we need to set up our Spark connection and load any required packages onto the cluster. Please note that if you don’t already have these packages installed you will need to install them before setting up your Spark connection so they can be found in your library and copied over to the cluster.

# Set up our Spark configuration - including our library path
default_config <- spark_config()
default_config["spark.r.libpaths"] <- .libPaths()[1]

sc <- spark_connect(
  master = "local[2]",
  app_name = "r-udfs",
  config = default_config)


# Define the libload function to load our required packages onto the cluster
libload <- function() {
  library(dplyr)
  library(janitor)
  library(rlang)

}

# All packages will be loaded on to worker nodes during first call to spark_apply in session
# so it is more efficient to do this on a minimal sdf (of length 1) first

sdf_len(sc, 1) |> sparklyr::spark_apply(f = libload,
                packages = FALSE)
                

Next, we will generate a Spark dataframe to test our function on and define a more complex version of our rounding UDF from example 1. In this example, we’ve assumed we want to pass in two extra values to the function; col, which will be the name of the column we want to apply the rounding to, and precision, which is the number of decimal places we want to round to. However, all of the arguments you need to pass to your UDF need to be included in the spark_apply context. As a result, we need to define our UDF as having just two arguments; the dataframe, and the context, which is passed as a list of all the required UDF arguments. Then we can split our additional arguments out of the list and into individual objects inside the function definition as shown below.

Note that this time, when defining the function we are using both mutate from the dplyr package and sym from rlang, so we have specified both of the package names in our function definition. The !!rlang::sym(col) converts the column name (which is passed into the function as a string) to a column object inside the function.

# Set up a dummy Spark dataframe
sdf <- sparklyr:::sdf_seq(sc, -7, 8, 2) |>
       sparklyr::mutate(half_id = id / 2) |>
       sparklyr::select(half_id)
       
# Define our UDF to take multiple arguments passed into a list named `context`
multi_arg_udf <- function(df, context) {

   col <- context$col               # Split out the `col` argument from the list of arguments in `context`
   precision <- context$precision   # Split out the `precision` argument from the list of arguments in `context`
   
   x <- df |>
        dplyr::mutate(rounded_col = round(!!rlang::sym(col), precision)) #need to specify both rlang and dplyr packages
    return(x)
}

Now we can run the UDF on our Spark dataframe as before, passing values to the col and precision arguments inside the multi_arg_udf function from the context list as shown below. We’ll run it a couple of times, passing a different value to precision just to confirm it works as expected.

# Run the function on our data using spark_apply and passing the `col` and `precision` arguments as a list using 'context'
multi0 <- sparklyr::spark_apply(sdf, multi_arg_udf, context = list(col = "half_id",
                                                             precision = 0), 
                                                             columns = c(half_id = "double",
                                                                         rounded_col = "double"),
                                                             packages = FALSE)     
                                                             

# Run the function on our data again using spark_apply, this time with a different `precision` using 'context'         
multi1 <- sparklyr::spark_apply(sdf, multi_arg_udf, context = list(col = "half_id",
                                                             precision = 1), 
                                                    columns = c(half_id = "double",
                                                                rounded_col = "double"),
                                                    packages = FALSE)               

# View the result with `col` = "half_id" and precision = 0
multi0

# View the result with `col` = "half_id" and precision = 1
multi1

spark_disconnect(sc)

Note that even if you only have one additional argument to pass into your UDF, you will still need to pass this in via the context argument as a list, but with only a single element (i.e. context = list(arg1 = value1)). You can define any number of additional arguments this way, provided you remember to split each one out of the context list inside your UDF.

These first two examples are very simple and we have not been paying any attention to partitioning in our data. However, for real, large, partitioned datasets we need to think carefully about how to partition our data, since spark_apply() receives each partition (rather than the whole dataset) and then applies the function on each one. The example in the next section shows how care must be taken with partioning data in order to get reliable results from spark_apply().

Example 3: Partitions and the group_by argument#

In this example, we will read in some partitioned data and use spark_apply() to perform an operation on it. We can set up our session and add packages to the cluster in the exact same way as we did before:

library(sparklyr)
library(dplyr)

# Set up our Spark configuration - including our library path
default_config <- spark_config()
default_config["spark.r.libpaths"] <- .libPaths()[1]

# open a spark connection
sc <- spark_connect(
  master = "local[2]",
  app_name = "r-udfs",
  config = default_config)


# Define the libload function to load our required packages onto the cluster
libload <- function() {
  library(dplyr)
  library(janitor)
  library(rlang)

}

# pre-load packages onto the cluster using a dummy spark dataframe and function
sdf_len(sc, 1) |> sparklyr::spark_apply(f = libload,
                packages = FALSE)

Next, we can read in the data we want to analyse. The repartition argument in spark_read_parquet has been set to 5 just to ensure there are multiple partitions in the data.

config <- yaml::yaml.load_file("ons-spark/config.yaml")

# read in and partition data
rescue <- sparklyr::spark_read_parquet(sc, config$rescue_clean_path, repartition = 5)

# preview data
dplyr::glimpse(rescue)

We can simplify this dataset a bit for our example and convert values that have the incorrect datatype listed:

# Drop unwanted columns and convert cal_year and total_cost to numeric
rescue_tidy <- rescue |>
  dplyr::select(incident_number, cal_year, total_cost, animal_group, borough) |>
  dplyr::mutate(across(c(cal_year, total_cost),
                ~as.numeric(.)))

glimpse(rescue_tidy)

If we check how this data has been partitioned, we can see that Spark has just taken arbitrary cuts of the data and split it across the partitions accordingly. As we would expect, here is no commonality between the data that is on one partition compared with the next and we have a mixture of years, animals and boroughs in each partition.

# check number of partitions
num_part <- sdf_num_partitions(rescue_tidy)

num_part

# Loop over partitions and view the top few rows of each 
# Remember that partitions are numbered from zero so we need to loop over 0 to `num_part-1`
for(i in 0:num_part-1) {
  rescue_tidy |>
    filter(spark_partition_id() == i) |>
    print(head(10))
}

Let us now try running a UDF on this data. Again, this example is not a good use case for a UDF as the code run could easily be run in sparklyr instead, but will serve to illustrate how partioning and grouping works using R UDFs.

We will write a UDF that will allow us to aggregate the data based on user input for a particular animal_group and borough, to generate a summary table of the total cost by cal_year of a particular animal type in a given area.

year_cost <- function(df, context) {

  # Split out the arguments from `context`
  animal <- context$animal_group
  borough <- context$borough
  
  x <- df |>
    dplyr::filter(animal_group == animal,
                 borough == borough) |>
    dplyr::group_by(cal_year) |>
    dplyr::summarise(total_yearly_cost_for_group = sum(total_cost, na.rm = TRUE))
  
  return(x)
  
}

Now, we can try running it using spark_apply on our data. Let us say that we want to know the total cost by calendar year of rescuing cats in Hackney:

# Apply our UDF
hackney_cats <- sparklyr::spark_apply(rescue_tidy,
                          year_cost, 
                          context = list(animal_group = "Cat", 
                                         borough = "Hackney"),
                          columns = c(cal_year = "double", 
                                      total_yearly_cost_for_group = "double"),
                          packages = FALSE)

# Total cost by year for Cats in Hackney
hackney_cats |> 
  arrange(cal_year) |> 
  print(n = 55)

Something has clearly gone a bit wrong here! Instead of returning an 11 row table, with one total cost for each year, we instead have 55 rows, with 5 total costs per year. This is a result of the arbitrary partitioning applied to the data when we read it in. Since spark_apply only receives data from individual partitions and applies the UDF on each one separately, the output has not been recombined as if the function has been applied to the entire dataset. Instead, we have a total cost per year for each of the 5 partitions!

A better approach to running this as a UDF would be to use the group_by argument in spark_apply to both group the data by cal_year and partition it accordingly. We will need to also adjust our UDF as there is no need to include group_by(cal_year) there as well. Making these changes produces the following output:

year_cost_no_group <- function(df, context) {
  animal <- context$animal_group
  borough <- context$borough
  
  x <- df |>
    dplyr::filter(animal_group == animal,
                 borough == borough) |>
    dplyr::summarise(total_yearly_cost_for_group = sum(total_cost, na.rm = TRUE))
  
  return(x)
  
}

hackney_cats_year <- sparklyr::spark_apply(rescue_tidy,
                          group_by = "cal_year",
                          year_cost_no_group, 
                          context = list(animal_group = "Cat", 
                                         borough = "Hackney"),
                          columns = c(cal_year = "double", 
                                      total_yearly_cost_for_group = "double"),
                          packages = FALSE)


hackney_cats_year |> 
  arrange(cal_year) |> 
  print(n = 11)

This is much better! We now have the output we expected.

Note that if we check the number of partitions of this output dataset (using sdf_num_partitions(hackney_cats_year)) the number returned will be the spark.sql.shuffle.partitions default value as we have performed a wide transformation on the data. It is a good idea to check and manage your partitions after running a UDF for this reason, as you may need to repartition to optimise your code.

Further resources#