'Scala mutable buffer does not get updated
I have the following method
case class Result(
files: RDD[String],
stats: Seq[Stats]
)
case class Stats(
index: Int,
size: Int
)
def convert(scopedResult: RDD[GenericRecord]): Result = {
val stats =
scala.collection.mutable.Buffer.empty[Stats]
val converted = scopedResult.mapPartitionsWithIndex {
(index, records) =>
val header = "dummy"
val lines = records.map { record =>
...
}.toSeq
// Update stats
stats += Stats(index, lines.size)
println(s"Inside map partitions ${stats} with code ${stats.hashCode()}")
(header +: lines).toIterator
}
println(s"Ready to produce results ${stats} with code ${stats.hashCode()}")
Result(converted, stats)
}
Whenever I am trying to retrieve the stats from the Result the stats are always empty... What is wrong here?
Initially I was using a mutable Seq to carry the stats but this was creating a new Seq and I just wanted to update the existing one. Should I be using a different data structure altogether?
// EDIT I added some print statements to debug and looks like that the stats array updated is NOT the one I have defined initially. These are the outputs from the print statements:
Ready to produce results ArrayBuffer() with code 473519988
Inside map partitions ArrayBuffer(Stats(0,10)) with code 751510129
I do not really understand what Scala does here. Based on the Buffer docs +=
Appends a single element to this buffer.
Does the Spark context messes everything up?
My hypothesis is that Spark does not serialize the stats buffer when it sends the work to the executors thus it never gets updated.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
