Example Spark Sessions#
This article gives some example Spark sessions, or Spark applications. For more information on Spark sessions and why you need to be careful with memory usage, please consult the Guidance on Spark Sessions and Configuration Hierarchy and spark-defaults.conf
.
Remember to only use a Spark session for as long as you need. It’s good etiquette to use spark.stop()
(for PySpark) or spark_disconnect(sc)
(for sparklyr) in your scripts. Stopping the container or Jupyter Notebook session will also close the Spark session if one is running.
Local mode#
As mentioned at the top of the article on Guidance on Spark Sessions there are two modes to running a Spark application, one is local mode (this example) and the other is cluster mode (all other examples below). Local mode can be used when running a Spark application on a single computer or node.
Details:
Utilises resource of a single node or machine
This example uses 2 cores
Amount of memory used depends on the node or machine
Use case:
Developing code using dummy or synthetic data or a small sample of data
Writing unit tests
Example of actual usage:
Pipeline development using dummy data
Throughout this book
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.master("local[2]")
.appName("local_session")
.getOrCreate()
)
library(sparklyr)
sc <- sparklyr::spark_connect(
master = "local[2]",
app_name = "local-session",
config = sparklyr::spark_config())
Note that all dependencies must also be in place to run a Spark application on your laptop, see Setting up resources section in the Getting Started with Spark article for further information.
Default Session#
As a starting point you can create a Spark session with all the default options. This is the bare minimum you need to create a Spark session and will work fine in most cases.
Please use this session by default or if unsure in any way about your resource requirements.
Details:
Will give you the default config options
Amount of resource depends on your specific platform
Use case:
When unsure of your requirements
Example of actual usage:
Investigation of new or unfamiliar data sources
Building a new pipeline where full user requirements aren’t yet known
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("default-session")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()
)
library(sparklyr)
default_config <- sparklyr::spark_config()
sc <- sparklyr::spark_connect(
master = "yarn-client",
app_name = "default-session",
config = default_config)
Small Session#
This is the smallest session that will realistically be used in DAP. It is similar in size to that used for DAP CATS training, as the low memory and only one core means several people can run this session at the same time on Dev Test, the cluster with the smallest capacity.
Details:
Only 1g of memory and 3 executors
Only 1 core
Number of partitions are limited to 12, which can improve performance with smaller data
Use case:
Simple data exploration of small survey data
Training and demonstrations when several people need to run Spark sessions simultaneously
Example of actual usage:
Used for DAPCATS PySpark training, with mostly simple calculations on small data
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("small-session")
.config("spark.executor.memory", "1g")
.config("spark.executor.cores", 1)
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.maxExecutors", 3)
.config("spark.sql.shuffle.partitions", 12)
.config("spark.ui.showConsoleProgress", "false")
.enableHiveSupport()
.getOrCreate()
)
library(sparklyr)
small_config <- sparklyr::spark_config()
small_config$spark.executor.memory <- "1g"
small_config$spark.executor.cores <- 1
small_config$spark.dynamicAllocation.enabled <- "true"
small_config$spark.dynamicAllocation.maxExecutors <- 3
small_config$spark.sql.shuffle.partitions <- 12
sc <- sparklyr::spark_connect(
master = "yarn-client",
app_name = "small-session",
config = small_config)
Medium Session#
A standard session used for analysing survey or synthetic datasets. Also used for some Production pipelines based on survey and/or smaller administrative data.
Details:
6g of memory and 3 executors
3 cores
Number of partitions are limited to 18, which can improve performance with smaller data
Use case:
Developing code in Dev Test
Data exploration in Production
Developing Production pipelines on a sample of data
Running smaller Production pipelines on mostly survey data
Example of actual usage:
Complex calculations, but on smaller synthetic data in Dev Test
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("medium-session")
.config("spark.executor.memory", "6g")
.config("spark.executor.cores", 3)
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.maxExecutors", 3)
.config("spark.sql.shuffle.partitions", 18)
.config("spark.ui.showConsoleProgress", "false")
.enableHiveSupport()
.getOrCreate()
)
library(sparklyr)
medium_config <- sparklyr::spark_config()
medium_config$spark.executor.memory <- "6g"
medium_config$spark.executor.cores <- 3
medium_config$spark.dynamicAllocation.enabled <- "true"
medium_config$spark.dynamicAllocation.maxExecutors <- 3
medium_config$spark.sql.shuffle.partitions <- 18
sc <- sparklyr::spark_connect(
master = "yarn-client",
app_name = "medium-session",
config = medium_config)
Large Session#
Session designed for running Production pipelines on large administrative data, rather than just survey data. Will often develop using a sample and a smaller session then change to this once the pipeline is complete.
Details:
10g of memory and 5 executors
1g of memory overhead
5 cores, which is generally optimal on larger sessions
The default number of 200 partitions
Use case:
Production pipelines on administrative data
Cannot be used in Dev Test, as it exceeds the 9 GB limit per executor
Example of actual usage:
One administrative dataset of 100 million rows
Many calculations
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("large-session")
.config("spark.executor.memory", "10g")
.config("spark.yarn.executor.memoryOverhead", "1g")
.config("spark.executor.cores", 5)
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.maxExecutors", 5)
.config("spark.sql.shuffle.partitions", 200)
.config("spark.ui.showConsoleProgress", "false")
.enableHiveSupport()
.getOrCreate()
)
library(sparklyr)
large_config <- sparklyr::spark_config()
large_config$spark.executor.memory <- "10g"
large_config$spark.yarn.executor.memoryOverhead <- "1g"
large_config$spark.executor.cores <- 5
large_config$spark.dynamicAllocation.enabled <- "true"
large_config$spark.dynamicAllocation.maxExecutors <- 5
large_config$spark.sql.shuffle.partitions <- 200
sc <- sparklyr::spark_connect(
master = "yarn-client",
app_name = "large-session",
config = large_config)
Extra Large session#
Used for the most complex pipelines, with huge administrative data sources and complex calculations. This uses a large amount of resource on the cluster, so only use when running Production pipelines.
It is even more important when using more resource to close your Spark session once finished.
Details:
20g of memory and 12 executors
2g of memory overhead
5 cores; using too many cores can actually cause worse performance on larger sessions
240 partitions; not significantly higher than the default of 200, but it is best for these to be a multiple of cores and executors
Use case:
Running large, complex pipelines in Production on mostly administrative data
Do not use for development purposes; use a smaller session and work on a sample of data or synthetic data
Example of actual usage:
Three administrative datasets of around 300 million rows
Significant calculations, including joins and writing and reading to many intermediate tables
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("xl-session")
.config("spark.executor.memory", "20g")
.config("spark.yarn.executor.memoryOverhead", "2g")
.config("spark.executor.cores", 5)
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.maxExecutors", 12)
.config("spark.sql.shuffle.partitions", 240)
.config("spark.ui.showConsoleProgress", "false")
.enableHiveSupport()
.getOrCreate()
)
library(sparklyr)
xl_config <- sparklyr::spark_config()
xl_config$spark.executor.memory <- "20g"
xl_config$spark.yarn.executor.memoryOverhead <- "2g"
xl_config$spark.executor.cores <- 5
xl_config$spark.dynamicAllocation.enabled <- "true"
xl_config$spark.dynamicAllocation.maxExecutors <- 12
xl_config$spark.sql.shuffle.partitions <- 240
sc <- sparklyr::spark_connect(
master = "yarn-client",
app_name = "xl-session",
config = xl_config)
Closing Spark sessions#
Remember to close your sessions once finished, so that the memory can be re-allocated to another user.
spark.stop()
sparklyr::spark_disconnect(sc)
Further Resources#
Spark at the ONS Articles:
PySpark Documentation:
sparklyr and tidyverse Documentation: