'Why does Spark Query Plan shows more partitions whenever cache (persist) is used

Given this PySpark code on a single worker Spark cluster with 2 cores:

df = spark.table('table')

df = df.dropDuplicates(['checksum'])

#
df = df.cache()

...

df.write.save('...)

it generates and executes a plan with 200 partitions when cache is present and only 2 partitions when df.cache() is not present.

I am particularly interested to know the impact cache has on the planning in this case.

With cache: with df.cache

Without cache: without cache

A similar impact df.cache seems to have on AQE(Adaptive Query Execution), coalescing post shuffle partitions doesn't seem to occur if the DataFrame is cached after an expensive shuffle.



Solution 1:[1]

This is an effect of AQE. See the Custom Shuffle Reader node in your 2nd image.

"...it executes CustomShuffleReaderExec instead of the ShuffleExchangeExec and the ShuffledRowRDD is executed with map-based shuffle files that should already be present in the local block manager (hence local shuffle reader)" https://www.waitingforcode.com/apache-spark-sql/what-new-apache-spark-3-local-shuffle-reader/read

The 200 partitions is from the default value of config spark.sql.shuffle.partitions

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Richard EB