Cross Joins#

A cross join is used to return every combination of the rows of two DataFrames. Cross joins are also referred to as the cartesian product of two DataFrames. It is different to other types of joins, which depend on matching values by using join keys.

As a cross join will return every combination of the rows, the size of the returned DataFrame is equal to the product of the row count of both source DataFrames; this can quickly get large and overwhelm your Spark session. As such, use them carefully!

The syntax for cross joins is different in PySpark and sparklyr. In PySpark, DataFrames have a .crossJoin() method. In sparklyr, use full_join() with by=character().

One use case for cross joins is to return every combination when producing results that involve grouping and aggregation, even when some of these are zero. Cross joins are also commonly used in salted joins, used to improve the efficiency of a join when the join keys are skewed.

Example: producing all combinations of results#

First, start a Spark session (disabling broadcast joins by default) read in the Animal Rescue data, group by animal_group and cal_year, and then get the sum of total_cost:

import subprocess
import yaml
from pyspark.sql import SparkSession, functions as F

spark = (SparkSession.builder.master("local[2]")
         .appName("joins")
         # Disable Broadcast join by default
         .config("spark.sql.autoBroadcastJoinThreshold", -1)
         .getOrCreate())

with open("../../../config.yaml") as f:
    config = yaml.safe_load(f)
    
rescue_path = config["rescue_path"]

rescue = (spark.read.parquet(rescue_path)
          .groupBy("animal_group", "cal_year")
          .agg(F.sum("total_cost").alias("cost")))

rescue.show(5, truncate=False)
+--------------------------------+--------+-------+
|animal_group                    |cal_year|cost   |
+--------------------------------+--------+-------+
|Snake                           |2018    |333.0  |
|Deer                            |2015    |1788.0 |
|Unknown - Domestic Animal Or Pet|2017    |2622.0 |
|Bird                            |2012    |32240.0|
|Hedgehog                        |2009    |520.0  |
+--------------------------------+--------+-------+
only showing top 5 rows

If there are no animals rescued in a particular year then there are no rows to sum, and so nothing will be returned. In some cases we would prefer a zero value to be returned, rather than an empty DataFrame.

For instance, no data will be returned for Hamster and 2012:

rescue.filter((F.col("animal_group") == "Hamster") & (F.col("cal_year") == 2012)).show()
+------------+--------+----+
|animal_group|cal_year|cost|
+------------+--------+----+
+------------+--------+----+

A cross join can be used here, to return every combination of animal_group and cal_year, which then serves as a base DataFrame which rescue can be joined to. First, create a DataFrame for unique animals and cal_years and sort them:

animals = rescue.select("animal_group").distinct().orderBy("animal_group")
cal_years = rescue.select("cal_year").distinct().orderBy("cal_year")

print(f"Distinct animal count: {animals.count()}")
print(f"Distinct year count: {cal_years.count()}")
Distinct animal count: 27
Distinct year count: 11

There can be issues in Spark when joining values from a DF to itself, which can be resolved by checkpointing to break the lineage of the DataFrame. See the article on checkpointing for more information.

checkpoint_path = config["checkpoint_path"]
spark.sparkContext.setCheckpointDir(checkpoint_path)

animals = animals.checkpoint()
cal_years = cal_years.checkpoint()

Now cross join cal_years and animals. In PySpark, use .crossJoin(), which is a method applied to the DataFrame and takes one argument, the DF to cross join to. In sparklyr, there is no native cross join function, so use full_join() with by=character().

Note that apart from the order of the columns the result will be the same regardless of which DF is on the left and which is on the right. The result DF will have \(27 \times 11 = 297\) rows.

result = animals.crossJoin(cal_years).orderBy("animal_group", "cal_year")
result.count()
297

Previewing result we can see that every combination has been returned:

result.show(15)
+------------+--------+
|animal_group|cal_year|
+------------+--------+
|        Bird|    2009|
|        Bird|    2010|
|        Bird|    2011|
|        Bird|    2012|
|        Bird|    2013|
|        Bird|    2014|
|        Bird|    2015|
|        Bird|    2016|
|        Bird|    2017|
|        Bird|    2018|
|        Bird|    2019|
|      Budgie|    2009|
|      Budgie|    2010|
|      Budgie|    2011|
|      Budgie|    2012|
+------------+--------+
only showing top 15 rows

Now left join rescue to result, to return the cost. Combinations which do not exist in rescue will return a null cost.

Note that the DF has been reordered during the join, as sort merge join was used rather than a broadcast join. See the article on optimising joinsfor more information on this.

result = result.join(rescue, on = ["animal_group", "cal_year"], how="left")
result.show(10)
+--------------------+--------+-------+
|        animal_group|cal_year|   cost|
+--------------------+--------+-------+
|                Deer|    2015| 1788.0|
|               Snake|    2018|  333.0|
|Unknown - Animal ...|    2011|   null|
|                Goat|    2013|   null|
|                Bird|    2012|32240.0|
|            Hedgehog|    2009|  520.0|
|               Sheep|    2013|   null|
|Unknown - Domesti...|    2017| 2622.0|
|              Ferret|    2017|   null|
|               Horse|    2012|16120.0|
+--------------------+--------+-------+
only showing top 10 rows

The final stage is to change any null values to zero and re-order the DF.

We can now filter on Hamster to see that 2012 now exists and is returned with a zero value:

result = result.fillna(0, "cost").orderBy("animal_group", "cal_year")
result.filter(F.col("animal_group") == "Hamster").show()
+------------+--------+------+
|animal_group|cal_year|  cost|
+------------+--------+------+
|     Hamster|    2009|   0.0|
|     Hamster|    2010| 780.0|
|     Hamster|    2011| 780.0|
|     Hamster|    2012|   0.0|
|     Hamster|    2013| 870.0|
|     Hamster|    2014| 295.0|
|     Hamster|    2015|   0.0|
|     Hamster|    2016|1630.0|
|     Hamster|    2017|   0.0|
|     Hamster|    2018|   0.0|
|     Hamster|    2019|   0.0|
+------------+--------+------+

Accidental Cross Joins#

Cross joins can be dangerous due to the size of DataFrame that they return. In PySpark, it is possible to create a cross join accidentally when using a regular join where the key column only has one distinct value in it. In these cases Spark will sometimes process the join as a cross join and return an error, depending on the lineage of the DataFrame. You can disable this error with .config("spark.sql.crossJoin.enabled", "true").

Note that this error is specific to Spark 2; in Spark 3, this option is enabled by default and so the error will only occur if you explicitly enable it.

This error will not occur in sparklyr.

To demonstrate this in PySpark (version 2.4.0), create a new DF, dogs, with just one column, animal_group containing just Dog, and join another DF to it:

from pyspark.sql.utils import AnalysisException

dogs = spark.range(5).withColumn("animal_group", F.lit("Dog"))

animal_noise = spark.createDataFrame([
    ["Cat", "Meow"],
    ["Dog", "Woof"],
    ["Cow", "Moo"]],
    ["animal_group", "animal_noise"])

try:
    dogs.join(animal_noise, on="animal_group", how="left").show()
except AnalysisException as e:
    print(e)
'Detected implicit cartesian product for LEFT OUTER join between logical plans\nRange (0, 5, step=1, splits=Some(2))\nand\nProject [animal_noise#203]\n+- Filter (isnotnull(animal_group#202) && (Dog = animal_group#202))\n   +- LogicalRDD [animal_group#202, animal_noise#203], false\nJoin condition is missing or trivial.\nEither: use the CROSS JOIN syntax to allow cartesian products between these\nrelations, or: enable implicit cartesian products by setting the configuration\nvariable spark.sql.crossJoin.enabled=true;'

To resolve this, start a new Spark session and add .config("spark.sql.crossJoin.enabled", "true") to the config. The DFs will have been cleared by closing the Spark session and so need to be created again.

spark.stop()

spark = (SparkSession.builder.master("local[2]")
         .appName("joins")
         .config("spark.sql.crossJoin.enabled", "true")
         .getOrCreate())

dogs = spark.range(5).withColumn("animal_group", F.lit("Dog"))

animal_noise = spark.createDataFrame([
    ["Cat", "Meow"],
    ["Dog", "Woof"],
    ["Cow", "Moo"]],
    ["animal_group", "animal_noise"])

dogs.join(animal_noise, on="animal_group", how="left").show()
+------------+---+------------+
|animal_group| id|animal_noise|
+------------+---+------------+
|         Dog|  0|        Woof|
|         Dog|  1|        Woof|
|         Dog|  2|        Woof|
|         Dog|  3|        Woof|
|         Dog|  4|        Woof|
+------------+---+------------+

Finally, clear the checkpoints that were used in the code:

p = subprocess.run(f"hdfs dfs -rm -r -skipTrash {checkpoint_path}", shell=True)
Deleted file:///home/cdsw/ons-spark/checkpoints

Salted Joins#

Cross joins can also be used when salting a join. Salting improves the efficiency of a join by reducing the skew in the DataFrame, by introducing new join keys. See the article on salted joins for an example.

Further Resources#

Spark at the ONS Articles:

PySpark Documentation:

sparklyr Documentation:

Other links:

  • GitHub: SPARK-28621: pull request for setting spark.sql.crossJoin.enabled to be true by default in Spark 3