'How does pyspark join happen on dataframes that are already suitably partitioned?

With the example of join:

A typical workflow of spark join is:

  1. Shuffle the datasets to bring the same keys to the same partitions for the respective dataset
  2. sort
  3. join across partitions

What if I use repartition with same number of partitions and merge_key on both the datasets to be joined beforehand.

Then the join should not do shuffle since I have already achieved that.

How does pyspark know this? Is this told by the user explicitly (in which case what is the way to tell this?) or does pyspark explicitly check this iterating over all the keys on all the partitions once?

Is this true for all wide transformations? If I use repartition beforehand then, how does spark decide to not shuffle?



Solution 1:[1]

The following code was used to JOIN 2 DF's already suitably partitioned. And the shuffle.partitions param matching for good measure. In addition I compared Spark 2.4.5 & 3.1.2.

%python

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import rand, randn
df1 = sqlContext.range(0, 10000000)
df2 = sqlContext.range(0, 10000000)
df3 = sqlContext.range(0, 10000000)

# Bump up numbers
df1 = df1.select("id", (20 * rand(seed=10)).cast(IntegerType()).alias("v1"))
df2 = df2.select("id", (50 * randn(seed=27)).cast(IntegerType()).alias("v1"))
df3 = df3.select("id", (50 * randn(seed=27)).cast(IntegerType()).alias("v2"))

df1rc = df1.repartition(23, "v1")
df2rc = df2.repartition(6, "v1")
df3rc = df3.repartition(23, "v2")

spark.sparkContext.setCheckpointDir("/foo/bar")
df1rc = df1rc.checkpoint()
df2rc = df2rc.checkpoint()
df3rc = df3rc.checkpoint()

spark.conf.set("spark.sql.shuffle.partitions", 23)
res = df1rc.join(df3rc, df1rc.v1 == df3rc.v2).explain()

.explain() returns Physical Plan in 2.4.5 as per below, this shows that the correct course to take by Catalyst (non-AQE) to not do a shuffle as both DF's have the same Partitioner (for a different column) and thus same number of Partitions:

== Physical Plan ==
*(3) SortMergeJoin [v1#84], [v2#90], Inner
:- *(1) Sort [v1#84 ASC NULLS FIRST], false, 0
:  +- *(1) Filter isnotnull(v1#84)
:     +- *(1) Scan ExistingRDD[id#78L,v1#84]
+- *(2) Sort [v2#90 ASC NULLS FIRST], false, 0
   +- *(2) Filter isnotnull(v2#90)
      +- *(2) Scan ExistingRDD[id#82L,v2#90]

.explain() returns Physical Plan in 3.1.2 as per below, in which we see hash partitioning - a shuffle being applied. To me that seems to be a bug, I think unnecessary shuffles are occurring. ENSURE_REQUIREMENTS seems to add a redundant - in our case shuffle.

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [v1#91], [v2#97], Inner
   :- Sort [v1#91 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(v1#91, 23), ENSURE_REQUIREMENTS, [id=#331]
   :     +- Filter isnotnull(v1#91)
   :        +- Scan ExistingRDD[id#85L,v1#91]
   +- Sort [v2#97 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(v2#97, 23), ENSURE_REQUIREMENTS, [id=#332]
         +- Filter isnotnull(v2#97)
            +- Scan ExistingRDD[id#89L,v2#97]

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1