'How to process to many different small files in spark improving performance

We are dealing with multiple different small files of size (Approx. 100MB) each.

file1.parquet (100 Mb)
file2.parquet (105 Mb)
file3.parquet (107 Mb)
file4.parquet (98 Mb)

val file1Df = spark.read.parquet(file1)
val file2Df = spark.read.parquet(file2)
val file3Df = spark.read.parquet(file3)
val file4Df = spark.read.parquet(file4)

val output12Df = file1Df.Join(file2Df, Seq(col1), "left")
val output123Df = output12Df.join(file3Df, Seq(col2), "left")
val output1234Df = output123Df.join(file4Df, Seq(col3), "left")
val outputDf = file12Df.join(output1234Df, Seq(col1,col4), "left").

like shown above we have similar multiple joins among small dataframes and encountering performance issue with multiple shuffle in data and data skewness.

  • As files size is small, broadcasting datasets makes memory issues or to increase driver memory.

Please advice on improving performance (Reduce shuffle and utilize proper resources) and processing too many small dataframes.



Solution 1:[1]

You can try Adaptive Query Execution since spark 3.0

spark.sql.adaptive.enabled = true
spark.sql.autoBroadcastJoinThreshold = 1G # This will favor BroadcastJoin to avoid shuffle, also increase executor memory accordingly
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 32M

For more details, please refer https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Warren Zhu