'StructuredStreaming - foreach/foreachBatch not working
I'm structured Streaming to read data from Kafka, write to BigQuery(though currently, i'm writing to console). I'm trying to use foreach (or foreachBatch) to make transformations to a record, however i'm running into issues.
Here is the code :
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10) \
.load()
print("df_stream -> ", df_stream, type(df_stream))
#df_stream -> DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int] <class 'pyspark.sql.dataframe.DataFrame'>
def convertToDict(self,row):
print(" IN CONVERT TO DICT ", row, " currentTime ", datetime.datetime.now())
query = df_stream.selectExpr("CAST(value AS STRING)").writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime='10 seconds') \
.option("numRows",10)\
.option("truncate", "false") \
.option("checkpointLocation", "/Users/karanalang/Documents/Technology/gcp/DataProc/checkpoint/") \
.foreach(convertToDict) \
.start()
query.awaitTermination()
Error when i use - foreach
22/02/07 12:36:49 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
out_iter = func(split_index, iterator)
File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1037, in func_without_process
TypeError: convertToDict() missing 1 required positional argument: 'row'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.execution.python.PythonForeachWriter.close(PythonForeachWriter.scala:66)
at org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.close(ForeachWriterTable.scala:168)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$9(WriteToDataSourceV2Exec.scala:457)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1518)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
How is the 'row' passed to the function, and how do i access the row to do transformations ?
When i use foreachBatch (in the code above), the epochId(or is it the batchId ?) gets printed. How do i use this to access the rows and make transformations ?
IN CONVERT TO DICT 44 currentTime 2022-02-07 12:50:27.074291
IN CONVERT TO DICT 45 currentTime 2022-02-07 12:50:36.591686
IN CONVERT TO DICT 46 currentTime 2022-02-07 12:50:40.316790
IN CONVERT TO DICT 47 currentTime 2022-02-07 12:50:50.322389
IN CONVERT TO DICT 48 currentTime 2022-02-07 12:51:00.346152
IN CONVERT TO DICT 49 currentTime 2022-02-07 12:51:10.302129
IN CONVERT TO DICT 50 currentTime 2022-02-07 12:51:20.350064
IN CONVERT TO DICT 51 currentTime 2022-02-07 12:51:30.313543
Pls note :
I need to examine each row, and make transformations to the row
So, should i be using foreach or foreachBatch ?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
