'Cannot access scala value/variable inside RDD foreach function (Null)

I have a Spark Structured Streaming job that needs to use the rdd.forEach inside the forEachBatch function as per the bellow code:

val tableName = "ddb_table"

df
    .writeStream
    .foreachBatch { (batchDF: DataFrame, _: Long) =>
      batchDF
        .rdd
        .foreach(
          r => updateDDB(r, tableName, "key")
        )

      curDate= LocalDate.now().toString.replaceAll("-", "/")
      prevDate= LocalDate.now().minusDays(1).toString.replaceAll("-", "/")
    }
    .outputMode(OutputMode.Append)
    .option("checkpointLocation", "checkPointDir")
    .start()
    .awaitTermination()

What happens is that the tableName variable is not recognized inside the rdd.forEach function because the call to the DynamoDB API inside the updateDDB raises an exception stating that the tableName cannot be null.

The issue is clearly in the rdd/forEach and the way it works with variables. I read some things about broadcast variables, but I don't have enough experience working with RDDs and Spark in a much lower level to be sure what is the way to go.

Some notes:

  1. I need this to be inside the forEachBatch function because I need to update other variables apart from this write to DDB (in this case the curDate and prevDate variables)
  2. The code runs successfully when I pass the tableName parameter directly in the function call.
  3. I have one class that extends the ForEachWriter that works ok when using the forEach instead of the forEachBatch, but as stated in point 1) I need to use the second because I need to update several things at a streaming batch time.


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source