'Understanding shuffle managers in Spark
Let me help to clarify about shuffle in depth and how Spark uses shuffle managers. I report some very helpful resources:
https://trongkhoanguyenblog.wordpress.com/
https://0x0fff.com/spark-architecture-shuffle/
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md
Reading them, I understood there are different shuffle managers. I want to focus about two of them: hash manager and sort manager(which is the default manager).
For expose my question, I want to start from a very common transformation:
val rdd = reduceByKey(_ + _)
This transformation causes map-side aggregation and then shuffle for bringing all the same keys into the same partition.
My questions are:
Is Map-Side aggregation implemented using internally a mapPartition transformation and thus aggregating all the same keys using the combiner function or is it implemented with a
AppendOnlyMaporExternalAppendOnlyMap?If
AppendOnlyMaporExternalAppendOnlyMapmaps are used for aggregating, are they used also for reduce side aggregation that happens into theResultTask?What exaclty the purpose about these two kind of maps (
AppendOnlyMaporExternalAppendOnlyMap)?Are
AppendOnlyMaporExternalAppendOnlyMapused from all shuffle managers or just from the sortManager?I read that after
AppendOnlyMaporExternalAppendOnlyMapare full, are spilled into a file, how exactly does this steps happen?Using the Sort shuffle manager, we use an appendOnlyMap for aggregating and combine partition records, right? Then when execution memory is fill up, we start sorting map, spilling it to disk and then clean up the map, my question is : what is the difference between spill to disk and shuffle write? They consist basically in creating file on local file system, but they are treat differently, Shuffle write records, are not put into the appendOnlyMap.
Can you explain in depth what happen when reduceByKey being executed, explaining me all the steps involved for to accomplish that? Like for example all the steps for map side aggregation, shuffling and so on.
Solution 1:[1]
It follows the description of reduceByKey step-by-step:
reduceByKeycallscombineByKeyWithTag, with identity combiner and identical merge value and create valuecombineByKeyWithClassTagcreates anAggregatorand returnsShuffledRDD. Both "map" and "reduce" side aggregations use internal mechanism and don't utilizemapPartitions.AgregatorusesExternalAppendOnlyMapfor bothcombineValuesByKey("map side reduction") andcombineCombinersByKey("reduce side reduction")- Both methods use
ExternalAppendOnlyMap.insertAllMethod ExternalAppendOnlyMapkeeps track of spilled parts and the current in-memory map (SizeTrackingAppendOnlyMap)insertAllmethod updates in-memory map and checks on insert if size estimated size of the current map exceeds the threshold. It uses inheritedSpillable.maybeSpillmethod. If threshold is exceeded this method callsspillas a side effect, andinsertAllinitializes cleanSizeTrackingAppendOnlyMapspillcallsspillMemoryIteratorToDiskwhich getsDiskBlockObjectWriterobject from the block manager.
insertAll steps are applied for both map and reduce side aggregations with corresponding Aggregator functions with shuffle stage in between.
As of Spark 2.0 there is only sort based manager: SPARK-14667
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
