'Cannot access scala value/variable inside RDD foreach function (Null)
I have a Spark Structured Streaming job that needs to use the rdd.forEach inside the forEachBatch function as per the bellow code:
val tableName = "ddb_table"
df
.writeStream
.foreachBatch { (batchDF: DataFrame, _: Long) =>
batchDF
.rdd
.foreach(
r => updateDDB(r, tableName, "key")
)
curDate= LocalDate.now().toString.replaceAll("-", "/")
prevDate= LocalDate.now().minusDays(1).toString.replaceAll("-", "/")
}
.outputMode(OutputMode.Append)
.option("checkpointLocation", "checkPointDir")
.start()
.awaitTermination()
What happens is that the tableName variable is not recognized inside the rdd.forEach function because the call to the DynamoDB API inside the updateDDB raises an exception stating that the tableName cannot be null.
The issue is clearly in the rdd/forEach and the way it works with variables. I read some things about broadcast variables, but I don't have enough experience working with RDDs and Spark in a much lower level to be sure what is the way to go.
Some notes:
- I need this to be inside the
forEachBatchfunction because I need to update other variables apart from this write to DDB (in this case thecurDateandprevDatevariables) - The code runs successfully when I pass the
tableNameparameter directly in the function call. - I have one class that extends the
ForEachWriterthat works ok when using theforEachinstead of theforEachBatch, but as stated in point 1) I need to use the second because I need to update several things at a streaming batch time.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
