{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Set Spark Job Description\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The Spark UI is a great tool for: monitoring Spark applications; troubleshooting slow jobs; and generally understanding how spark will execute your code. However, it is sometimes difficult to find the correct job number to drill down on a problem. \n", "\n", "There is a way to set the Spark job description when using Pyspark, making use of the `spark.sparkContext.setJobDescription()` function. This function takes a string input and will update the description column in the spark UI with the string. While this makes finding the job in the UI much easier, if we do not update the descriptions in our script or tell spark to [revert back to default descriptions](#back-to-default-description) each job will be given the same description. \n", "\n", "We will work through a short example to highlight its effectiveness. First we will start a Spark session and then generate the link to the Spark UI.\n", "It should be noted that the link to the spark UI in this notebook will not work as it linked to a specific spark session. \n", "You will need to rerun the code with an active spark session to generate a link for yourself." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Spark UI" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import yaml\n", "from pyspark.sql import SparkSession\n", "import pyspark.sql.functions as F\n", "import os, IPython\n", "\n", "spark = SparkSession.builder.master(\"local[2]\").appName(\"job-description\").getOrCreate()\n", "\n", "url = \"spark-%s.%s\" % (os.environ[\"CDSW_ENGINE_ID\"], os.environ[\"CDSW_DOMAIN\"])\n", "IPython.display.HTML(\"Spark UI\" % url)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Default descriptions\n", "By default spark will generate job descriptions based on the action that has been called.\n", "Now we will read in some data to perform some transformations and some actions to create a few Spark jobs." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "5898" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "with open(\"../../config.yaml\") as f:\n", " config = yaml.safe_load(f)\n", "\n", "rescue_path = config[\"rescue_path_csv\"]\n", "\n", "rescue = (spark.read.csv(rescue_path, header=True, inferSchema=True)\n", " .withColumnRenamed('AnimalGroupParent','AnimalGroup')\n", " .select('IncidentNumber','FinalDescription','AnimalGroup','CalYear')\n", ")\n", "\n", "rescue.count()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------+-----------------------------------------+-----------+-------+\n", "|IncidentNumber|FinalDescription |AnimalGroup|CalYear|\n", "+--------------+-----------------------------------------+-----------+-------+\n", "|139091 |DOG WITH JAW TRAPPED IN MAGAZINE RACK,B15|Dog |2009 |\n", "|275091 |ASSIST RSPCA WITH FOX TRAPPED,B15 |Fox |2009 |\n", "|2075091 |DOG CAUGHT IN DRAIN,B15 |Dog |2009 |\n", "|2872091 |HORSE TRAPPED IN LAKE,J17 |Horse |2009 |\n", "|3553091 |RABBIT TRAPPED UNDER SOFA,B15 |Rabbit |2009 |\n", "+--------------+-----------------------------------------+-----------+-------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "rescue.sort(\"CalYear\").show(5, truncate=False)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(IncidentNumber='000014-03092018M', AnimalGroup='Unknown - Heavy Livestock Animal'),\n", " Row(IncidentNumber='000099-01012017', AnimalGroup='Dog'),\n", " Row(IncidentNumber='000260-01012017', AnimalGroup='Bird')]" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rescue.select(\"IncidentNumber\", \"AnimalGroup\").sort(\"IncidentNumber\").limit(3).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can check the Spark UI. As we can see, a number of jobs have been created, but which action created each job?\n", "\n", "![Spark UI, jobs tab showing default descriptions for the jobs](../images/Job_description_default.png)\n", "\n", "In the *Description* column we can see the description starts with the action that created the job. This is useful to identify the correct job, but we can do one better.\n", "\n", "## Customised description\n", "\n", "We can customise the job description using `spark.sparkContext.setJobDescription()`. This is useful as we can assign a more detailed name to each job within our code. Doing so will help us understand and find the exact action which has created a job in the spark UI." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "238" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sparkContext.setJobDescription(\"Count fox incidents\")\n", "\n", "rescue.filter(F.col(\"AnimalGroup\") == \"Fox\").count()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "540" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sparkContext.setJobDescription(\"Count incidents in 2015\")\n", "\n", "rescue.filter(F.col(\"CalYear\") == 2015).count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Looking at the UI again you can see our descriptions have been updated to contain our customised names.\n", "\n", "![Spark UI, jobs tab showing default descriptions and customised descriptions for the jobs](../images/Job_description_custom.png)\n", "\n", "**WARNING** every job from now on will have the last description we set, unless we tell spark to default back to the default descriptions.\n", "\n", "## Back to default description\n", "\n", "We can set the description to `None` to revert back to the default descriptions. This should be done if you do not plan on adding additional descriptions details into your script." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------+-----------------------------------------+-----------+-------+\n", "|IncidentNumber|FinalDescription |AnimalGroup|CalYear|\n", "+--------------+-----------------------------------------+-----------+-------+\n", "|139091 |DOG WITH JAW TRAPPED IN MAGAZINE RACK,B15|Dog |2009 |\n", "|275091 |ASSIST RSPCA WITH FOX TRAPPED,B15 |Fox |2009 |\n", "|2075091 |DOG CAUGHT IN DRAIN,B15 |Dog |2009 |\n", "|2872091 |HORSE TRAPPED IN LAKE,J17 |Horse |2009 |\n", "|3553091 |RABBIT TRAPPED UNDER SOFA,B15 |Rabbit |2009 |\n", "+--------------+-----------------------------------------+-----------+-------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "spark.sparkContext.setJobDescription(None)\n", "\n", "rescue.show(5, truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The job description for the most recent job has been revered to its default:\n", "\n", "![Spark UI, jobs tab showing default and customised descriptions for the jobs](../images/Job_description_Back_to_default.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "\n", "- Default job descriptions tell you the action used to trigger the job\n", "- Set a job description to better track jobs in the Spark UI using `spark.sparkContext.setJobDescription()` \n", "- Remember to set the description to `None` to go back to using default descriptions once you've finished tracking your jobs. \n", "- The description will carry through to the Stages tab also, but will not appear in the SQL tab." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Further Resources\n", "\n", "Spark at the ONS Articles:\n", "- [Spark Application and UI](../spark-concepts/spark-application-and-ui)\n", "\n", "PySpark Documentation:\n", "- [`setJobDescription`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setJobDescription.html?highlight=setjob)\n", "- [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)\n", "- [`.count()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.count.html)\n", "- [`.groupBy()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.groupBy.html)\n", "- [`.show()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.show.html)\n", "\n", "Spark documentation:\n", "- [Monitoring and Instrumentation](https://spark.apache.org/docs/latest/monitoring.html)\n", "- [Spark Web UI](https://spark.apache.org/docs/latest/web-ui.html)\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 4 }