'StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last)

I'm getting this error msg when running my eventhub receiver notebook and the last code looks like below

streamer = (
spark.readStream.format("eventhubs")
.options(**ehConf)
.load()
.writeStream.foreachBatch(write_to_table)
.option(
    "checkpointLocation",
    eventhub_checkpoint_location,
)
.outputMode("update")
.start()
)
streamer.awaitTermination()

the error message is like below

StreamingQueryException                   Traceback (most recent call last)
<command-3982255611789723> in <module>
     17 # COMMAND ----------
     18 
---> 19 streamer.awaitTermination()

/databricks/spark/python/pyspark/sql/streaming.py in awaitTermination(self, timeout)
    101             return self._jsq.awaitTermination(int(timeout * 1000))
    102         else:
--> 103             return self._jsq.awaitTermination()
    104 
    105     @property

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    131                 # Hide where the exception came from that shows a non-Pythonic
    132                 # JVM exception message.
--> 133                 raise_from(converted)
    134             else:
    135                 raise

/databricks/spark/python/pyspark/sql/utils.py in raise_from(e)

StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/databricks/spark/python/pyspark/sql/utils.py", line 206, in call
    raise e
  File "/databricks/spark/python/pyspark/sql/utils.py", line 203, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "<command-3982255611789722>", line 57, in write_to_parquet_table
    exceptions_destination_table
  File "/databricks/spark/python/pyspark/sql/readwriter.py", line 868, in saveAsTable
    self._jwrite.saveAsTable(name)
  File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/databricks/spark/python/pyspark/sql/utils.py", line 133, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: cannot resolve 'data' given input columns: [body, timestamp, exception];

=== Streaming Query ===
Identifier: [id = b90b4921-b795-4e7d-b50d-1483b7956ec8, runId = 0cb697b8-5b15-40ae-a6bb-07e6a8e8a7a4]
Current Committed Offsets: {}
Current Available Offsets: {org.apache.spark.sql.eventhubs.EventHubsSource@f2f568a: {"api_dbwrite":{"8":251,"11":251,"2":244,"5":246,"14":253,"13":250,"4":247,"7":246,"1":245,"10":252,"9":254,"3":244,"12":254,"15":250,"6":246,"0":247}}}

Current State: ACTIVE Thread State: RUNNABLE

Logical Plan: org.apache.spark.sql.eventhubs.EventHubsSource@f2f568a

write_to_table function

def write_to_table(batch_df, epochId):
batch_df.persist()
try:
    
    df = batch_df.withColumn("body", batch_df["body"].cast("string"))
    df = df[["body"]]
    # remove the array from the json if it exists.
    remove_array_udf = udf(remove_array, StringType())
    df = df.withColumn(
    "body",
    remove_array_udf(col("body")),
    )
    body = df.head(1)[0]["body"]
    body_dict=json.loads(body)
    message_type = body_dict['message_type']
    print("message_type:",message_type)
    
    logger.info(f"processing incoming record with message_type: {message_type}")
    if (message_type=='Label'):
        tablename=databricks_destination_database+'.label'
        schema=label_schema
    elif (message_type=="HistogramJobStat"):
        tablename=databricks_destination_database+'.histogram_job_stat'
        schema=histogram_job_stat_schema
    elif (message_type=="JobMetric"):
        tablename=databricks_destination_database+'.job_metric'
        schema=job_metric_schema
    else:
        raise Exception(
            "Invalid message type provided: {}.".format(
                message_type
            )
    )  
    logger.info(f"Destination table: {tablename}")
    logger.info(f"Schema of table: {tablename} is {schema}")
    data=body_dict['data']
    data_df = spark.createDataFrame([data,], ['data',])
    save_eventdf(data_df,tablename,schema)

except:
        from pyspark.sql.functions import lit
        import traceback
        from datetime import datetime
        exception_message_str = traceback.format_exc()
        exp_df = df.withColumn("timestamp", lit(datetime.utcnow()))
        exp_df = exp_df.withColumn("exception", lit(exception_message_str))
        exceptions_destination_table = databricks_destination_database + ".api_exceptions"
        logger.info(f"exceptions_destination_table = {exceptions_destination_table}")
        exp_df.write.format("parquet").mode("append").saveAsTable(
            exceptions_destination_table
        )
        logger.error(exception_message_str)
    # Note: We don't want to shutdown the post processor we log the exception and carry on.
finally:
    batch_df.unpersist()

anyone could help me figure out what's causing the error..thanks!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source