'Stream-Static Join: Error Handling for File not exist issue
I'm building a Spark Structured Streaming application (spark version 2.4) that has stream-static join, and refreshes the static dataframe from HDFS every 30min following the rateStream approach here: https://stackoverflow.com/a/66451431/18164247. But the problem is there's a time the HDFS file gets deleted every hour (which I don't have control on that), so I'll get the java.io.FileNotFoundException during that time. Already have try/catch in the foreachBatch method for refreshing the staticDF, but seems like the error came during the stream-static join.
The refresh of the staticDF is triggered every 30min and staticDF is persisted after this refresh, so in my understanding during this 30min, that join_stream should be using this persisted staticDF without the need to read from HDFS during the process, why am I still getting the error of HDFS file not found?
In addition, in order to avoid having executors fail on this FileNotFoundException (continue processing with creating an empty staticDF and join with the kafka data), how can I do error handling in this scenario? Any ideas/suggestions are appreciated!
Code:
//Read source data from Kafka
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaSourceTopic)
.option("kafka.security.protocol", kafkaSecurity)
.option("startingOffsets", kafkaStartingOffsets)
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", kafkaMaxOffsetPerTrigger)
.load()
.transform(transform_func)
//Dummy stream to trigger streaming query
val staticRefreshStream = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 1)
.load()
.selectExpr("CAST(value as LONG) as trigger")
.as[Long]
def loadHDFS(): DataFrame = {
Try(spark.read.format("parquet").load(hdfsPath).transform(transform_func)) match {
case Success(value) => value
case Failure(e) => {
logger.info("Loading HDFS data with error, creating an empty dataframe. Error details: " + e.toString + e.printStackTrace())
spark.createDataFrame(spark.sparkContext.emptyRDD[Row], staticSchema)
}
}
}
// HDFS first load
var staticDf = loadHDFS()
staticDf.persist()
def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
staticDf.unpersist()
staticDf = loadHDFS()
staticDf.persist()
logger.info(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe")
}
//Join kafka data with HDFS data
val joinDf = df.join(staticDf, col("col1")===col("col2"), "left").transform(transform_func).toJSON
/** Sink Data */
//Cluster output to Kafka
joinDf.writeStream
.format("kafka")
.outputMode("update")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("topic", kafkaDestTopic)
.queryName("join_stream")
.option("kafka.security.protocol", kafkaSecurity)
.start()
//Trigger the refresh every 30min
staticRefreshStream.writeStream
.outputMode("append")
.foreachBatch(foreachBatchMethod[Long] _)
.queryName("refresh_stream")
.trigger(Trigger.ProcessingTime("30 minutes"))
.start()
spark.streams.awaitAnyTermination()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
