'Spark joinWith repartitions an already partitioned Dataset

Let's say we have two partitioned datasets

    val partitionedPersonDS = personDS.repartition(200, personDS("personId"))
    val partitionedTransactionDS = transactionDS.repartition(200, transactionDS("personId"))

And we try to join them using joinWith on the same key over which they are partitioned

    val transactionPersonDS: Dataset[(Transaction, Person)] = partitionedTransactionDS
      .joinWith(
        partitionedPersonDS,
        partitionedTransactionDS.col("personId") === partitionedPersonDS.col("personId")
      )

The Physical plan shows that the already partitioned Dataset's were repartitioned as part of the Sort Merge Join

InMemoryTableScan [_1#14, _2#15]
   +- InMemoryRelation [_1#14, _2#15], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(5) SortMergeJoin [_1#14.personId], [_2#15.personId], Inner
            :- *(2) Sort [_1#14.personId ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(_1#14.personId, 200)
            :     +- *(1) Project [named_struct(transactionId, transactionId#8, personId, personId#9, itemList, itemList#10) AS _1#14]
            :        +- Exchange hashpartitioning(personId#9, 200)
            :           +- LocalTableScan [transactionId#8, personId#9, itemList#10]
            +- *(4) Sort [_2#15.personId ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(_2#15.personId, 200)
                  +- *(3) Project [named_struct(personId, personId#2, name, name#3) AS _2#15]
                     +- Exchange hashpartitioning(personId#2, 200)
                        +- LocalTableScan [personId#2, name#3]

But when we perform the join using join the already partitioned Dataset's were NOT repartitioned and only Sorted as part of the Sort Merge Join

      val transactionPersonDS: DataFrame = partitionedTransactionDS
        .join (
          partitionedPersonDS,
          partitionedTransactionDS("personId") === partitionedPersonDS("personId")
        )
InMemoryTableScan [transactionId#8, personId#9, itemList#10, personId#2, name#3]
   +- InMemoryRelation [transactionId#8, personId#9, itemList#10, personId#2, name#3], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(3) SortMergeJoin [personId#9], [personId#2], Inner
            :- *(1) Sort [personId#9 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(personId#9, 200)
            :     +- LocalTableScan [transactionId#8, personId#9, itemList#10]
            +- *(2) Sort [personId#2 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(personId#2, 200)
                  +- LocalTableScan [personId#2, name#3]

Why joinWith fails to honor a pre-partitioned dataset unlike join



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source