'StructuredStreaming - foreach/foreachBatch not working

I'm structured Streaming to read data from Kafka, write to BigQuery(though currently, i'm writing to console). I'm trying to use foreach (or foreachBatch) to make transformations to a record, however i'm running into issues.

Here is the code :

df_stream = spark.readStream.format('kafka') \
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.location", ssl_truststore_location) \
    .option("kafka.ssl.truststore.password", ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location) \
    .option("kafka.ssl.keystore.password", ssl_keystore_password) \
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .option("maxOffsetsPerTrigger", 10) \
    .load()

print("df_stream -> ", df_stream, type(df_stream))
#df_stream ->  DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int] <class 'pyspark.sql.dataframe.DataFrame'>

def convertToDict(self,row):
    print(" IN CONVERT TO DICT ", row, " currentTime ", datetime.datetime.now())


query = df_stream.selectExpr("CAST(value AS STRING)").writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .option("numRows",10)\
    .option("truncate", "false") \
    .option("checkpointLocation", "/Users/karanalang/Documents/Technology/gcp/DataProc/checkpoint/") \
    .foreach(convertToDict) \
    .start()


query.awaitTermination()

Error when i use - foreach

22/02/07 12:36:49 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
    out_iter = func(split_index, iterator)
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1037, in func_without_process
TypeError: convertToDict() missing 1 required positional argument: 'row'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.execution.python.PythonForeachWriter.close(PythonForeachWriter.scala:66)
    at org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.close(ForeachWriterTable.scala:168)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$9(WriteToDataSourceV2Exec.scala:457)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1518)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

How is the 'row' passed to the function, and how do i access the row to do transformations ?

When i use foreachBatch (in the code above), the epochId(or is it the batchId ?) gets printed. How do i use this to access the rows and make transformations ?

IN CONVERT TO DICT  44  currentTime  2022-02-07 12:50:27.074291
 IN CONVERT TO DICT  45  currentTime  2022-02-07 12:50:36.591686
 IN CONVERT TO DICT  46  currentTime  2022-02-07 12:50:40.316790
 IN CONVERT TO DICT  47  currentTime  2022-02-07 12:50:50.322389
 IN CONVERT TO DICT  48  currentTime  2022-02-07 12:51:00.346152
 IN CONVERT TO DICT  49  currentTime  2022-02-07 12:51:10.302129
 IN CONVERT TO DICT  50  currentTime  2022-02-07 12:51:20.350064
 IN CONVERT TO DICT  51  currentTime  2022-02-07 12:51:30.313543

Pls note :
I need to examine each row, and make transformations to the row So, should i be using foreach or foreachBatch ?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source