Pydoop: HDFS to pandas#
The usual way to interact with data stored in the Hadoop Distributed File System (HDFS) is to use Spark.
Some datasets are small enough that they can be easily handled with pandas. One method is to start a Spark session, read in the data as PySpark DataFrame with spark.read.csv()
, then convert to a pandas DataFrame with .toPandas()
.
The Pydoop package allows you to bypass Spark and read in the data directly to a pandas DataFrame. Remember that your data will have to be able to fit into the driver memory, so do not use this for big datasets. Guidance on when to use Spark and when to consider alternatives is in the When To Use Spark article.
Pydoop Setup#
Pydoop can be installed in the same way as any other package, e.g. with pip install pydoop
. If using CDSW you need to use pip3 install
to ensure that Python 3 is being used.
Then import hdfs
from Pydoop, as well as pandas; note that PySpark is not being imported:
import pydoop.hdfs as hdfs
import pandas as pd
Reading files#
This example will use a CSV stored in the ONS training area on HDFS. You can read in other file types that are supported by pandas, e.g. json or Excel.
Reading in the data is then a two stage process; first open the file with hdfs.open()
, then read in as a pandas DataFrame with pd.read_csv()
. If a with
statement is used you do not need to explicitly close the file with f.close()
.
file_path = "/training/animal_rescue.csv"
with hdfs.open(file_path, "r") as f:
pandas_df = pd.read_csv(f)
pandas_df
is now a pandas DataFrame loaded in the driver memory and all the usual methods will work.
e.g. we can preview the first five rows and columns of the DataFrame with .iloc
and .head()
:
pandas_df.iloc[:, :5].head()
IncidentNumber | DateTimeOfCall | CalYear | FinYear | TypeOfIncident | |
---|---|---|---|---|---|
0 | 139091 | 01/01/2009 03:01 | 2009 | 2008/09 | Special Service |
1 | 275091 | 01/01/2009 08:51 | 2009 | 2008/09 | Special Service |
2 | 2075091 | 04/01/2009 10:07 | 2009 | 2008/09 | Special Service |
3 | 2872091 | 05/01/2009 12:27 | 2009 | 2008/09 | Special Service |
4 | 3553091 | 06/01/2009 15:23 | 2009 | 2008/09 | Special Service |
Get count of rows and columns with .shape()
; as we are in pandas not Spark the concept of lazy evaluation does not apply:
pandas_df.shape
(5898, 26)
Writing files#
You can write files with Pydoop in a similar way to reading them. One advantage of this is that the CSV will be written as one file, whereas using df.write.csv()
in PySpark will write out a partitioned CSV.
Another difference is that CSV files from pandas will have an index written by default, which you may want to remove.
First of all, get the path to write out to; e.g. if using the ONS development and testing environment the following code will get a path to your own user area on HDFS:
import os
username = os.getenv("HADOOP_USER_NAME")
write_path = f"/user/{username}/rescue_copy.csv"
Similar to reading, use hdfs.open()
, then .to_csv()
. Importantly, we are using wt
for the mode
option when opening the file, where w
means write
and t
means text
.
with hdfs.open(write_path, "wt") as f:
pandas_df.to_csv(f, index=False)
The file will now be stored on HDFS; to check you can read it in, or open it using a user interface such as Hadoop User Experience (HUE).
Other Useful Functions#
Pydoop has lots of functions; see the documentation for a full list. A couple of examples are given below.
isfile()
, isdir()
and ls()
: See if a file/directory exists and list files#
hdfs.path.isdir(dir_path)
will return True
if a file exists:
hdfs.path.isfile(write_path)
True
It returns False
if the file does not exist:
hdfs.path.isfile("/not/a_file")
False
isdir()
works in a similar way, but for directories rather than files
ls()
lists files in the specified directory. You may be familiar with the this command from Unix.
getsize()
: File size in bytes#
getsize()
returns the file size in bytes. For large files, bytes are often not a practical unit to use so you may want to divide it to get it in KB or MB:
size = hdfs.path.getsize(write_path) / (1024 ** 2)
print(f"{round(size, 2)} MB")
1.79 MB
rm()
is used to delete files and directories, and is another command you may know from Unix. Be careful, there is no recovery option with this!
hdfs.rm(write_path)
Verify that this has been removed by checking that isfile()
returns False
:
hdfs.path.isfile(write_path)
False
Limitations#
Remember that your pandas DataFrame is stored in the driver memory, rather than the Spark cluster, so some larger datasets are not able to be read in in this way. You can read in the data using spark.read.csv()
and then subset the data by filtering or taking a sample first, then converting to pandas with .toPandas()
.
CSV files that are saved from PySpark will often be partitioned, so to use this method you would have to write a loop that reads in all the files in the directory, so in this case you may prefer to read and convert with PySpark first.
This method works for reading in files from a directory on HDFS, but not for Hive tables. In theory you can read the underlying parquet files but it is much easier just to read in using PySpark using spark.read.table()
then convert with .toPandas
.
Further Resources#
Spark at the ONS Articles:
PySpark Documentation:
Pydoop Documentation:
pandas Documentation: