'StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
I'm running a StructuredStreaming program on GCP Dataproc, which reads data from Kafka, does some processing and puts processed data back into Kafka. The programs was running fine, when i killed it (to make minor change), and then re-started it.
It is giving me the error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
java.lang.IllegalStateException: batch 44 doesn't exist
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Traceback (most recent call last):
File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
query.awaitTermination()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
Question - what is the cause of this error and how to debug/fix ? Also, i notice that the checkpoint location gets corrupted occassionaly, when i do multiple restarts. After checkpoint corruption, it does not return any records
For the above issue(as well as when the checkpoint was corrupted), when i cleared the checkpoint location and re-started the program, it went though fine.
Pls note: while doing readStream, i've enabled failOnDataLoss=false
here is the readStream command, with the failOnDataLoss=false:
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 5000) \
.load()
any input on this ? tia!
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
