'Spark JOIN on 2 DF's with same Partitioner in 2.4.5 vs 3.1.2 appears to differ in approach, unfavourably for newer version
The following code which is to compare some aspects between Spark 2.4.5 & 3.1.2 in which we just JOIN 2 DF's with same Partitioner, and not what is a less good approach in v3.1.2:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import rand, randn
df1 = sqlContext.range(0, 10000000)
df2 = sqlContext.range(0, 10000000)
df3 = sqlContext.range(0, 10000000)
# Bump up numbers
df1 = df1.select("id", (20 * rand(seed=10)).cast(IntegerType()).alias("v1"))
df2 = df2.select("id", (50 * randn(seed=27)).cast(IntegerType()).alias("v1"))
df3 = df3.select("id", (50 * randn(seed=27)).cast(IntegerType()).alias("v2"))
df1rc = df1.repartition(23, "v1")
df2rc = df2.repartition(6, "v1")
df3rc = df3.repartition(23, "v2")
spark.sparkContext.setCheckpointDir("/foo/bar")
df1rc = df1rc.checkpoint()
df2rc = df2rc.checkpoint()
df3rc = df3rc.checkpoint()
spark.conf.set("spark.sql.shuffle.partitions", 23)
res = df1rc.join(df3rc, df1rc.v1 == df3rc.v2).explain()
.explain() returns Physical Plan in 2.4.5 below, this seems obvious the correct course to take by Catalyst (non-AQE) as both DF's have the same Partitioner (for a different column) and thus same number of Partitions:
== Physical Plan ==
*(3) SortMergeJoin [v1#84], [v2#90], Inner
:- *(1) Sort [v1#84 ASC NULLS FIRST], false, 0
: +- *(1) Filter isnotnull(v1#84)
: +- *(1) Scan ExistingRDD[id#78L,v1#84]
+- *(2) Sort [v2#90 ASC NULLS FIRST], false, 0
+- *(2) Filter isnotnull(v2#90)
+- *(2) Scan ExistingRDD[id#82L,v2#90]
.explain() returns Physical Plan in 3.1.2 below, in which we see hashpartitioning being applied. I looked the format up of the .explain(), but find little info. Is this the same Physical Plan as 2.4.5? It seems as though v3.1.2 cannot recognize the Partitioners are the same.
Does ENSURE_REQUIREMENTS in reality imply the same as v2.4.5?
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [v1#91], [v2#97], Inner
:- Sort [v1#91 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(v1#91, 23), ENSURE_REQUIREMENTS, [id=#331]
: +- Filter isnotnull(v1#91)
: +- Scan ExistingRDD[id#85L,v1#91]
+- Sort [v2#97 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(v2#97, 23), ENSURE_REQUIREMENTS, [id=#332]
+- Filter isnotnull(v2#97)
+- Scan ExistingRDD[id#89L,v2#97]
I do not think spark.sql.execution.sortBeforeRepartition plays a role here.
I looked at this as well https://towardsdatascience.com/should-i-repartition-836f7842298c
I think unnecessary shuffles are occurring.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
