Reading and Writing Data in Spark#
This chapter will go into more detail about the various file formats available to use with Spark, and how Spark interacts with these file formats. Introduction to PySpark and Introduction to SparklyR briefly covered CSV files and Parquet files and some basic differences between them. The final section of the page will cover the importance of managing partitions when writing data to disk, for further information on partitions see Managing Partitions.
This chapter will provide more detail on parquet files, CSV files, ORC files and Avro files, the differences between them and how to read and write data using these formats.
Let’s start by setting up a Spark session and reading in the config file.
from pyspark.sql import SparkSession, functions as F
import yaml
spark = (SparkSession.builder.master("local[2]")
.appName("reading-data")
.getOrCreate())
with open("../../../config.yaml") as f:
config = yaml.safe_load(f)
library(sparklyr)
library(dplyr)
sc <- sparklyr::spark_connect(
master = "local[2]",
app_name = "ons-spark",
config = sparklyr::spark_config(),
)
config <- yaml::yaml.load_file("ons-spark/config.yaml")
CSV files#
Reading in a CSV file#
To read in a CSV file, you can use spark.read.csv()
with PySpark or spark_read_csv()
with SparklyR as demonstrated below.
animal_rescue = spark.read.csv(config["rescue_path_csv"], header = True, inferSchema = True)
rescue_df <- sparklyr::spark_read_csv(sc,
path = config$rescue_path_csv,
header = TRUE,
infer_schema = TRUE)
It’s important to note the two arguments we have provided to the spark.read.csv()
function, header
and inferSchema
.
By setting header
to True, we’re saying that we want the top row to be used as the column names. If we did not set this argument to True, then the top rows will be treated as the first row of data, and columns will be given a default name of _c1
, _c2
, _c3
and so on.
inferSchema
is very important - a disadvantage of using a CSV file is that they are not associated with a schema in the same way parquet files are. By setting inferSchema
to True, we’re allowing the PySpark API to attempt to work out the schemas based on the contents of each column. If this were set to False, then each column would be set to a string datatype by default.
Note that inferSchema
may not always give the result you’re expecting - we can see this in the DateTimeOfCall column in the below code. We may want this as a timestamp type, but it has been read in as a string.
animal_rescue.printSchema()
sparklyr::sdf_schema(rescue_df)
root
|-- IncidentNumber: string (nullable = true)
|-- DateTimeOfCall: string (nullable = true)
|-- CalYear: integer (nullable = true)
|-- FinYear: string (nullable = true)
|-- TypeOfIncident: string (nullable = true)
|-- PumpCount: double (nullable = true)
|-- PumpHoursTotal: double (nullable = true)
|-- HourlyNotionalCost(£): integer (nullable = true)
|-- IncidentNotionalCost(£): double (nullable = true)
|-- FinalDescription: string (nullable = true)
|-- AnimalGroupParent: string (nullable = true)
|-- OriginofCall: string (nullable = true)
|-- PropertyType: string (nullable = true)
|-- PropertyCategory: string (nullable = true)
|-- SpecialServiceTypeCategory: string (nullable = true)
|-- SpecialServiceType: string (nullable = true)
|-- WardCode: string (nullable = true)
|-- Ward: string (nullable = true)
|-- BoroughCode: string (nullable = true)
|-- Borough: string (nullable = true)
|-- StnGroundName: string (nullable = true)
|-- PostcodeDistrict: string (nullable = true)
|-- Easting_m: double (nullable = true)
|-- Northing_m: double (nullable = true)
|-- Easting_rounded: integer (nullable = true)
|-- Northing_rounded: integer (nullable = true)
$IncidentNumber
$IncidentNumber$name
[1] "IncidentNumber"
$IncidentNumber$type
[1] "StringType"
$DateTimeOfCall
$DateTimeOfCall$name
[1] "DateTimeOfCall"
$DateTimeOfCall$type
[1] "StringType"
$CalYear
$CalYear$name
[1] "CalYear"
$CalYear$type
[1] "IntegerType"
$FinYear
$FinYear$name
[1] "FinYear"
$FinYear$type
[1] "StringType"
$TypeOfIncident
$TypeOfIncident$name
[1] "TypeOfIncident"
$TypeOfIncident$type
[1] "StringType"
$PumpCount
$PumpCount$name
[1] "PumpCount"
$PumpCount$type
[1] "DoubleType"
$PumpHoursTotal
$PumpHoursTotal$name
[1] "PumpHoursTotal"
$PumpHoursTotal$type
[1] "DoubleType"
$HourlyNotionalCostGBP
$HourlyNotionalCostGBP$name
[1] "HourlyNotionalCostGBP"
$HourlyNotionalCostGBP$type
[1] "IntegerType"
$IncidentNotionalCostGBP
$IncidentNotionalCostGBP$name
[1] "IncidentNotionalCostGBP"
$IncidentNotionalCostGBP$type
[1] "DoubleType"
$FinalDescription
$FinalDescription$name
[1] "FinalDescription"
$FinalDescription$type
[1] "StringType"
$AnimalGroupParent
$AnimalGroupParent$name
[1] "AnimalGroupParent"
$AnimalGroupParent$type
[1] "StringType"
$OriginofCall
$OriginofCall$name
[1] "OriginofCall"
$OriginofCall$type
[1] "StringType"
$PropertyType
$PropertyType$name
[1] "PropertyType"
$PropertyType$type
[1] "StringType"
$PropertyCategory
$PropertyCategory$name
[1] "PropertyCategory"
$PropertyCategory$type
[1] "StringType"
$SpecialServiceTypeCategory
$SpecialServiceTypeCategory$name
[1] "SpecialServiceTypeCategory"
$SpecialServiceTypeCategory$type
[1] "StringType"
$SpecialServiceType
$SpecialServiceType$name
[1] "SpecialServiceType"
$SpecialServiceType$type
[1] "StringType"
$WardCode
$WardCode$name
[1] "WardCode"
$WardCode$type
[1] "StringType"
$Ward
$Ward$name
[1] "Ward"
$Ward$type
[1] "StringType"
$BoroughCode
$BoroughCode$name
[1] "BoroughCode"
$BoroughCode$type
[1] "StringType"
$Borough
$Borough$name
[1] "Borough"
$Borough$type
[1] "StringType"
$StnGroundName
$StnGroundName$name
[1] "StnGroundName"
$StnGroundName$type
[1] "StringType"
$PostcodeDistrict
$PostcodeDistrict$name
[1] "PostcodeDistrict"
$PostcodeDistrict$type
[1] "StringType"
$Easting_m
$Easting_m$name
[1] "Easting_m"
$Easting_m$type
[1] "DoubleType"
$Northing_m
$Northing_m$name
[1] "Northing_m"
$Northing_m$type
[1] "DoubleType"
$Easting_rounded
$Easting_rounded$name
[1] "Easting_rounded"
$Easting_rounded$type
[1] "IntegerType"
$Northing_rounded
$Northing_rounded$name
[1] "Northing_rounded"
$Northing_rounded$type
[1] "IntegerType"
To correct this, we can either cast the column as a date type or we can provide a schema when reading it in.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
custom_schema = StructType([
StructField("IncidentNumber", StringType(), True),
StructField("DateTimeOfCall", TimestampType(), True),
StructField("CalYear", IntegerType(), True),
StructField("TypeOfIncident", StringType(), True),])
animal_rescue_with_schema = spark.read.csv(path = config["rescue_path_csv"], header = True, schema = custom_schema)
animal_rescue_with_schema.printSchema()
custom_schema = list(IncidentNumber = "character",
DateTimeOfCall = "date",
CalYear = "integer",
TypeOfIncident = "character")
rescue_df <- sparklyr::spark_read_csv(sc,
path = config$rescue_path_csv,
header = TRUE,
infer_schema = FALSE,
columns = custom_schema)
sparklyr::sdf_schema(rescue_df)
root
|-- IncidentNumber: string (nullable = true)
|-- DateTimeOfCall: timestamp (nullable = true)
|-- CalYear: integer (nullable = true)
|-- TypeOfIncident: string (nullable = true)
$IncidentNumber
$IncidentNumber$name
[1] "IncidentNumber"
$IncidentNumber$type
[1] "StringType"
$DateTimeOfCall
$DateTimeOfCall$name
[1] "DateTimeOfCall"
$DateTimeOfCall$type
[1] "DateType"
$CalYear
$CalYear$name
[1] "CalYear"
$CalYear$type
[1] "IntegerType"
$TypeOfIncident
$TypeOfIncident$name
[1] "TypeOfIncident"
$TypeOfIncident$type
[1] "StringType"
We can see that DateTimeOfCall has now been read in correctly as a timestamp type. Also, providing a schema will improve the efficiency of the operation. This is because, in order to infer a schema, Spark needs to scan the dataset. A column could contain, for example, an integer for the first 1000 rows and a string for the 1001th row - in which case it would be inferred as a string, but this wouldn’t be obvious from the first 1000 rows. Needing to sample the data in each column can be quite memory intensive. Providing a schema means that Spark no longer has to sample the data before reading it in.
Generally, if you’re using a small dataset it is fine to use infer schema - however if you’re reading in a large dataset, it may take considerably longer to read in and this will be repeated after every action (see Persisting in Spark to understand why this is the case), potentially increasing your execution time even further.
Writing out CSV files#
To write a Spark dataframe to a CSV file, you can use the .write.csv()
method in PySpark, or spark_write_csv()
in SparklyR, as demonstrated below:
animal_rescue.write.csv(config["temp_outputs"] + "animal_rescue.csv", header = True, mode = "overwrite")
sparklyr::spark_write_csv(rescue_df,
paste0(config$temp_outputs, "animal_rescue_r.csv"),
header = TRUE,
mode = 'overwrite')
Note that as mentioned above, we need to specify header = True
for Spark to treat the top row as column names.
To overwrite an existing file, you can specify mode = overwrite
. Other writing modes include append
which appends the data to the pre-existing file file, ignore
which ignores the write operation if the file already exists or error
- this is the default and will error if the file already exists.
Parquet files#
Reading in a parquet file#
To read in a parquet file in PySpark, you can use spark.read.parquet()
as demonstrated below. For SparklyR, you would use the spark_read_parquet()
method.
animal_rescue = spark.read.parquet(config["rescue_path"])
rescue_df <- sparklyr::spark_read_parquet(sc, path = config$rescue_path)
As you can see, we didn’t have to provide a schema or use the inferSchema argument - this is because parquet files already have a schema associated with them, which is stored in the metadata.
Writing out parquet files#
To write out a Spark dataframe as a parquet file, you can use .write.parquet()
in PySpark or spark_write_parquet()
in SparklyR.
animal_rescue.write.parquet(config["temp_outputs"] + "animal_rescue.parquet", mode = "overwrite")
sparklyr::spark_write_parquet(rescue_df, paste0(config$temp_outputs, "animal_rescue_r.parquet"), mode = 'overwrite')
There is no need to specify the header
argument as with CSV files when saving out or reading in a parquet file - because of the metadata associated with parquet files, Spark knows whether something is a column header or not. As with CSV files, you can specify alternative modes for saving out the data including append
and ignore
.
As well as having a schema associated with them, there are other benefits of working with parquet files instead of CSV files. These include:
Column-based format - parquet files are organised by columns, rather than by row. This allows for better compression and more efficient use of storage space, as columns typically contain similar data types and repeating values. Additionally, when accessing only specific columns, Spark can skip reading in unnecessary data and only read in the columns of interest.
Predicate pushdown - parquet supports predicate pushdowns, this means if you read in the full dataset and then filter, the filter clause will be “pushed down” to where the data is stored, meaning it can be filtered before it is read in, reducing the amount of memory used to process the data.
Compression - parquet has built-in compression methods to reduce the required storage space.
Complex data types - parquet files support complex data types such as nested data.
Optimised Row Columnar (ORC) files#
Reading in ORC files#
To read in an ORC file, using PySpark, you can use spark.read.orc()
. Using SparklyR, you would use spark_read_orc()
animal_rescue = spark.read.orc(config["rescue_path_orc"])
rescue_df <- sparklyr::spark_read_orc(sc, path = config$rescue_path_orc)
Writing out ORC files#
To write out an ORC file, you can use dataframe.write.orc()
in PySpark or spark_write_orc()
in SparklyR.
animal_rescue.write.orc(config["temp_outputs"] + "animal_rescue.orc", mode = "overwrite")
sparklyr::spark_write_orc(rescue_df, paste0(config$temp_outputs, "animal_rescue_r.orc"), mode = 'overwrite')
Again, there is no need to use inferSchema
or specify a header
argument, as with a CSV file.
An ORC file shares a lot of the same benefits that a parquet file has over a CSV, including:
The schema is stored with the file, meaning we don’t have to specify a schema
Column-based formatting
Predicate pushdown support
Built-in compression
Support of complex data types
Avro files#
Reading in Avro files#
To read in an Avro file using PySpark, you can use spark.read.format("avro")
.
Note that unlike other methods, Spark doesn’t have a built in spark.read.avro()
method so we need to use a slightly different method to read this in, by first specifying the format as “avro” and then using .load()
to read in the file. Note that this method would also work for other formats, such as spark.read.format("parquet").load(...)
but is slightly more verbose than the other methods demonstrated.
Using SparklyR, you will need to install the sparkavro
package and use the spark_read_avro()
function. Note that this function requires three arguments: the spark connection name, a name to assign the newly read in table (in this case “animal_rescue”), and the path to the avro file.
animal_rescue = spark.read.format("avro").load(config["rescue_path_avro"])
library(sparkavro)
animal_rescue = sparkavro::spark_read_avro(sc, "animal_rescue", config$rescue_path_avro)
Writing out Avro files#
To write out an Avro file, you can use dataframe.write.format("avro").save()
in PySpark. Note that like the method to read in Avro files, we need to specify the format and then use save
to specify that we want to save this output.
In SparklyR, we can use the spark_write_avro()
function. This only requires two arguments: the name of the dataframe to be written to file and the output path.
animal_rescue.write.mode("overwrite").format("avro").save(config["temp_outputs"] + "animal_rescue.avro")
sparkavro::spark_write_avro(animal_rescue, paste0(config$temp_outputs, "animal_rescue.avro"), mode = "overwrite")
Also note that for these methods, we pass in the mode as write.mode("overwrite")
/mode = "overwrite"
.
While an Avro file has many of the benefits associated with parquet and ORC files, such as being associated with a schema and having built-in compression methods, there is one key difference: An Avro file is row-based, not column-based.
See the section on “Which file type should you use?” for a discussion on row-based formats vs column-based formats.
Hive tables#
Reading in Hive tables#
To read in a Hive table, we can use one of the following approaches:
#using spark.sql
animal_rescue = spark.sql("SELECT * FROM train_tmp.animal_rescue")
#using spark.read.table
animal_rescue = spark.read.table("train_tmp.animal_rescue")
rescue_df <- sparklyr::sdf_sql(sc, "SELECT * FROM train_tmp.animal_rescue")
Both of the PySpark methods achieve the same thing - you can use the SQL approach if you want to combine it with additional queries or if you’re more familiar with SQL syntax but the spark.read.table approach will achieve the same end result.
One thing to note is that we first specify the database name and then the name of the table. This is because Hive tables are stored within databases. This isn’t necessary if you’re already in the correct database - you can specify which database you’re working in by using spark.sql("USE database_name")
, so we could also read in the dataset using this code:
spark.sql("USE train_tmp")
animal_rescue = spark.read.table("animal_rescue")
Writing out a Hive table#
To write out a Hive table, we can use dataframe.write.mode('overwrite').saveAsTable()
or spark_write_table
in SparklyR.
animal_rescue.write.mode('overwrite').saveAsTable("train_tmp.animal_rescue_temp")
sparklyr::spark_write_table(rescue_df, name = "train_tmp.animal_rescue_temp", mode = 'overwrite')
Again, notice that the output format is database.table_name, although, as mentioned above, we don’t need to include the database name if we have ensured we’re working in the correct database using spark.sql("USE database_name)
first.
In many ways, Hive tables have a lot of the same benefits that a parquet file has, such as the storing of schemas and supporting predicate pushdowns. In fact, a Hive table may consist of parquet files - a parquet file is often the default underlying file structure Spark uses when saving out a Hive table. A Hive table can consist of any of the underlying data formats we’ve discussed in this chapter. The key difference with a Hive table compared to the other data formats is that a Hive table tends to store more detailed metadata.
So, which file format should I use?#
There are a number of factors to consider when deciding which file format to use.
Do I need my data to be human-readable?#
Of the file formats discussed in this chapter, only CSV files are human-readable. This means that if you need to look at the data yourself, i.e. for quality assurance purposes, you would need to ensure that your data is in a CSV file. However, CSV files are generally less efficient to read in than other file formats such as parquet or ORC files, particularly if you only require a subset of the available columns.
It’s also important to note that Spark is a big data solution, so if you’re only working with only a small amount of data that needs to be manually examined by a human, it may be worth reconsidering whether Spark is needed at all - it could be a good idea to read when to use Spark.
Row-based vs columnar-based formats#
Generally, both row-based and columnar-based formats are fine to use with Spark. The benefits of choosing one or the other are highly dependent on downstream processing and would likely result in relatively small improvements in processing speed. For a more in-depth discussion on which types of downstream processing would be more suited to a row-based or a columnar-based file format, this article may be useful.
Do you work primarily with databases/SQL?#
If you’re primarily working with databases/tables within databases and SQL, it may be a good idea to use a Hive table. You can use any format as the underlying data format within a Hive table - so it may be worthwhile reviewing the data formats presented in this chapter to decide which format would be most appropriate for your use case.
Partitions when writing data#
Although we have not fully discussed dataframe partitions in this page, we should consider partitions when wrtiting data to a disk. As mentioned in the Managing Partitons page we are able to partiton data by any column when wrtitng to disk. The reason why this is useful is we can choose which partitions we want to read in later. This is really useful for larger data sets.
Before we demonstrate this by writing the animal_rescue
DataFrame as parquet files, we need to remove the £
signs from the column names as these are not supported by the parquet format. In sparklyr they are replaced automatically when reading in the csv file, so no need for this step.
for col in animal_rescue.columns:
if '£' in col:
new_name = col.replace('(£)','GBP')
animal_rescue = animal_rescue.withColumnRenamed(col, new_name)
Next let’s create a file path and write the animal_rescue
DataFrame to disk in parquet format by partitioning the data in terms of CalYear
,
repartition_path = config["checkpoint_path"] + "/rescue_by_year.parquet"
animal_rescue.write.mode('overwrite').partitionBy('cal_year').parquet(repartition_path)
repartition_path <- paste0(config$checkpoint_path, "/rescue_by_year.parquet")
sparklyr::spark_write_parquet(rescue_df,
repartition_path,
mode='overwrite',
partition_by='cal_year')
This will create multiple directories in the rescue_by_year.parquet
directory on the file system, one for each year in the data.
The easiest way to see this is by navigating to these directories using the file browser in HUE. Alternatively we can use the subprocess
package to run lines of code through the terminal to return the contents of the rescue_by_year.parquet
directory.
import subprocess
cmd = f"hdfs dfs -ls -C {repartition_path}"
p = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, universal_newlines=True)
print(p.stdout)
cmd <- paste0("hdfs dfs -ls -C ", repartition_path)
system(cmd)
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2009
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2010
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2011
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2012
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2013
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2014
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2015
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2016
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2017
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2018
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/CalYear=2019
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/_SUCCESS
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/_SUCCESS
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2009
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2010
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2011
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2012
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2013
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2014
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2015
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2016
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2017
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2018
file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet/cal_year=2019
On the right of the ouput above you will see there is one directory for each CalYear
. So to import a subset of the data we can use the specific path for that year or filter the data in Spark and let Spark work out which folders to look for.
Finally, we will delete these files to clean up the file system.
cmd = f"hdfs dfs -rm -r -skipTrash {repartition_path}"
p = subprocess.run(cmd, shell=True)
cmd <- paste0("hdfs dfs -rm -r -skipTrash ", repartition_path)
system(cmd)
Deleted file:///home/cdsw/ons-spark/checkpoints/rescue_by_year.parquet
Further resources#
Spark at the ONS Articles:
Spark Documentation: