'Spark joinWith repartitions an already partitioned Dataset
Let's say we have two partitioned datasets
val partitionedPersonDS = personDS.repartition(200, personDS("personId"))
val partitionedTransactionDS = transactionDS.repartition(200, transactionDS("personId"))
And we try to join them using joinWith on the same key over which they are partitioned
val transactionPersonDS: Dataset[(Transaction, Person)] = partitionedTransactionDS
.joinWith(
partitionedPersonDS,
partitionedTransactionDS.col("personId") === partitionedPersonDS.col("personId")
)
The Physical plan shows that the already partitioned Dataset's were repartitioned as part of the Sort Merge Join
InMemoryTableScan [_1#14, _2#15]
+- InMemoryRelation [_1#14, _2#15], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(5) SortMergeJoin [_1#14.personId], [_2#15.personId], Inner
:- *(2) Sort [_1#14.personId ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(_1#14.personId, 200)
: +- *(1) Project [named_struct(transactionId, transactionId#8, personId, personId#9, itemList, itemList#10) AS _1#14]
: +- Exchange hashpartitioning(personId#9, 200)
: +- LocalTableScan [transactionId#8, personId#9, itemList#10]
+- *(4) Sort [_2#15.personId ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_2#15.personId, 200)
+- *(3) Project [named_struct(personId, personId#2, name, name#3) AS _2#15]
+- Exchange hashpartitioning(personId#2, 200)
+- LocalTableScan [personId#2, name#3]
But when we perform the join using join the already partitioned Dataset's were NOT repartitioned and only Sorted as part of the Sort Merge Join
val transactionPersonDS: DataFrame = partitionedTransactionDS
.join (
partitionedPersonDS,
partitionedTransactionDS("personId") === partitionedPersonDS("personId")
)
InMemoryTableScan [transactionId#8, personId#9, itemList#10, personId#2, name#3]
+- InMemoryRelation [transactionId#8, personId#9, itemList#10, personId#2, name#3], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(3) SortMergeJoin [personId#9], [personId#2], Inner
:- *(1) Sort [personId#9 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(personId#9, 200)
: +- LocalTableScan [transactionId#8, personId#9, itemList#10]
+- *(2) Sort [personId#2 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(personId#2, 200)
+- LocalTableScan [personId#2, name#3]
Why joinWith fails to honor a pre-partitioned dataset unlike join
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
