'Apache Spark writer partitionBy causes OOM

A dataset of Parquet files with a size of more than 700gb is available. The Parquet consists of 2 columns, each with a JSON document. I would now like to convert these Parquet files and save them with partitions. Read, transform and save. Finally, there is a new folder with partitions and corresponding Parquet files. So much for the idea.

Reading the source data is done with a spark.read.parquet("/my/folder/ **/.parquet")*. The transformation is done on this dataframe with some JSON auxiliary methods. After completion, a dataframe with several columns is available. In addition to a date (YYYY-MM-DD), there are other columns, and the original data is also still available. For writing, I execute a partitionByRange("date", "col1", "col2"), sortWithinPartitions("date", "col1") and a write.partititionBy("date"). My small Spark cluster (6 workers, with 4 core and 2GB ram) are now busy for a few hours. When writing, however, there is always an oom. My driver (spark-shell) is equipped with 24gb ram and the machine does not give more. The files individually can be processed well, my problem seems to be the amount of data. My guess: Merging the partial results from the workers leads to an oom in the driver. I have also experimented with the maxRecordsPerFile option, unfortunately without success. What other possibilities are there to avoid oom?

archiveDF
  .repartitionByRange($"xxxx", $"startTime",$"uuid") // !!! causes oom !!!
  .sortWithinPartitions("xxxx","startTime")
  .write
  .mode("append")
  .option("maxRecordsPerFile", 50000)
  .partitionBy("xxxx")
  .format("parquet")
  .save("/long-term-archive/data-store")


Solution 1:[1]

When you use repartitionByRange (in Spark 3.2.1 anyway) without supplying a number of wanted partitions, spark uses spark.sql.shuffle.partitions (which is 200 by default) as the number of partitions you want to end up with.

If you divide your total data by the nr of partitions, you get 700 GB/200 partitions = 3.5 GB/partition. This is very large (typically around 100MB partitions are a good idea) and your executors have only 0.5GB RAM/core. So in your case, you could try to use 7000 partitions and see if you get a better result like that. Something like:

.repartitionByRange(7000, $"xxxx", $"startTime",$"uuid")

I'm assuming there is no huge data skew on those repartitioning keys. If there is a large skew, you want have to salt your keys.

Another question: what is the use of repartitioning again when writing? Repartitioning requires a shuffle operation which is typically one of the more expensive operations there are. You should try to aim to repartition the least amount of times.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1