{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Pydoop: HDFS to pandas\n", "\n", "The usual way to interact with data stored in the Hadoop Distributed File System (HDFS) is to use Spark.\n", "\n", "Some datasets are small enough that they can be easily handled with pandas. One method is to start a Spark session, read in the data as PySpark DataFrame with [`spark.read.csv()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html), then convert to a pandas DataFrame with [`.toPandas()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html).\n", "\n", "The [Pydoop](https://crs4.github.io/pydoop/)\n", " package allows you to bypass Spark and read in the data directly to a pandas DataFrame. Remember that your data will have to be able to fit into the driver memory, so do not use this for big datasets. Guidance on when to use Spark and when to consider alternatives is in the [When To Use Spark](../spark-overview/when-to-use-spark) article.\n", "\n", "### Pydoop Setup\n", "\n", "Pydoop can be installed in the same way as any other package, e.g. with `pip install pydoop`. If using CDSW you need to use `pip3 install` to ensure that Python 3 is being used.\n", "\n", "Then import `hdfs` from Pydoop, as well as pandas; note that PySpark is not being imported:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pydoop.hdfs as hdfs\n", "import pandas as pd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Reading files\n", "\n", "This example will use a CSV stored in the ONS training area on HDFS. You can read in other file types that are supported by pandas, e.g. [json](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_json.html) or [Excel](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html).\n", "\n", "Reading in the data is then a two stage process; first open the file with `hdfs.open()`, then read in as a pandas DataFrame with [`pd.read_csv()`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html). If a `with` statement is used you do not need to explicitly close the file with `f.close()`." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "file_path = \"/training/animal_rescue.csv\"\n", "with hdfs.open(file_path, \"r\") as f:\n", " pandas_df = pd.read_csv(f)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`pandas_df` is now a pandas DataFrame loaded in the driver memory and all the usual methods will work.\n", "\n", "e.g. we can preview the first five rows and columns of the DataFrame with [`.iloc`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.iloc.html) and [`.head()`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.head.html):" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IncidentNumberDateTimeOfCallCalYearFinYearTypeOfIncident
013909101/01/2009 03:0120092008/09Special Service
127509101/01/2009 08:5120092008/09Special Service
2207509104/01/2009 10:0720092008/09Special Service
3287209105/01/2009 12:2720092008/09Special Service
4355309106/01/2009 15:2320092008/09Special Service
\n", "
" ], "text/plain": [ " IncidentNumber DateTimeOfCall CalYear FinYear TypeOfIncident\n", "0 139091 01/01/2009 03:01 2009 2008/09 Special Service\n", "1 275091 01/01/2009 08:51 2009 2008/09 Special Service\n", "2 2075091 04/01/2009 10:07 2009 2008/09 Special Service\n", "3 2872091 05/01/2009 12:27 2009 2008/09 Special Service\n", "4 3553091 06/01/2009 15:23 2009 2008/09 Special Service" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pandas_df.iloc[:, :5].head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get count of rows and columns with [`.shape()`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.shape.html); as we are in pandas not Spark the concept of lazy evaluation does not apply:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(5898, 26)" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pandas_df.shape" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Writing files\n", "\n", "You can write files with Pydoop in a similar way to reading them. One advantage of this is that the CSV will be written as one file, whereas using [`df.write.csv()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.csv.html) in PySpark will [write out a partitioned CSV](../spark-functions/writing-data).\n", "\n", "Another difference is that CSV files from pandas will have an index written by default, which you may want to remove.\n", "\n", "First of all, get the path to write out to; e.g. if using the ONS development and testing environment the following code will get a path to your own user area on HDFS:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "import os\n", "username = os.getenv(\"HADOOP_USER_NAME\") \n", "write_path = f\"/user/{username}/rescue_copy.csv\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similar to reading, use `hdfs.open()`, then [`.to_csv()`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html). Importantly, we are using `wt` for the `mode` option when opening the file, where `w` means `write` and `t` means `text`." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "with hdfs.open(write_path, \"wt\") as f:\n", " pandas_df.to_csv(f, index=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The file will now be stored on HDFS; to check you can read it in, or open it using a user interface such as Hadoop User Experience (HUE).\n", "\n", "### Other Useful Functions\n", "\n", "Pydoop has lots of functions; see the [documentation](https://crs4.github.io/pydoop/index.html) for a full list. A couple of examples are given below.\n", "\n", "#### `isfile()`, `isdir()` and `ls()`: See if a file/directory exists and list files\n", "\n", "`hdfs.path.isdir(dir_path)` will return `True` if a file exists:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "hdfs.path.isfile(write_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It returns `False` if the file does not exist:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "False" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "hdfs.path.isfile(\"/not/a_file\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`isdir()` works in a similar way, but for directories rather than files\n", "\n", "`ls()` lists files in the specified directory. You may be familiar with the this command from Unix.\n", "\n", "#### `getsize()`: File size in bytes\n", "\n", "`getsize()` returns the file size in bytes. For large files, bytes are often not a practical unit to use so you may want to divide it to get it in KB or MB:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1.79 MB\n" ] } ], "source": [ "size = hdfs.path.getsize(write_path) / (1024 ** 2)\n", "print(f\"{round(size, 2)} MB\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`rm()` is used to delete files and directories, and is another command you may know from Unix. Be careful, there is no recovery option with this!" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "hdfs.rm(write_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Verify that this has been removed by checking that `isfile()` returns `False`:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "False" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "hdfs.path.isfile(write_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Limitations\n", "\n", "Remember that your pandas DataFrame is stored in the driver memory, rather than the Spark cluster, so some larger datasets are not able to be read in in this way. You can read in the data using `spark.read.csv()` and then subset the data by filtering or [taking a sample](../spark-functions/sampling) first, then converting to pandas with `.toPandas()`.\n", "\n", "CSV files that are saved from PySpark will often be partitioned, so to use this method you would have to write a loop that reads in all the files in the directory, so in this case you may prefer to read and convert with PySpark first.\n", "\n", "This method works for reading in files from a directory on HDFS, but not for Hive tables. In theory you can read the underlying parquet files but it is much easier just to read in using PySpark using [`spark.read.table()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.table.html) then convert with `.toPandas`.\n", "\n", "### Further Resources\n", "\n", "Spark at the ONS Articles:\n", "- [When To Use Spark](../spark-overview/when-to-use-spark)\n", "- [Writing Data](../spark-functions/writing-data)\n", "- [Sampling](../spark-functions/sampling)\n", "\n", "PySpark Documentation:\n", "- [`spark.read.csv()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html)\n", "- [`.toPandas()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html)\n", "- [`.write.csv()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.csv.html) \n", "- [`spark.read.table()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.table.html)\n", "\n", "Pydoop Documentation:\n", "- [Pydoop](https://crs4.github.io/pydoop/)\n", "\n", "pandas Documentation:\n", "- [`pd.read_csv()`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)\n", "- [`pd.read_json()`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_json.html)\n", "- [`pd.read_excel()`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html)\n", "- [`.iloc`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.iloc.html)\n", "- [`.head()`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.head.html)\n", "- [`.shape()`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.shape.html)\n", "- [`.to_csv()`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }