'StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last)
I'm getting this error msg when running my eventhub receiver notebook and the last code looks like below
streamer = (
spark.readStream.format("eventhubs")
.options(**ehConf)
.load()
.writeStream.foreachBatch(write_to_table)
.option(
"checkpointLocation",
eventhub_checkpoint_location,
)
.outputMode("update")
.start()
)
streamer.awaitTermination()
the error message is like below
StreamingQueryException Traceback (most recent call last)
<command-3982255611789723> in <module>
17 # COMMAND ----------
18
---> 19 streamer.awaitTermination()
/databricks/spark/python/pyspark/sql/streaming.py in awaitTermination(self, timeout)
101 return self._jsq.awaitTermination(int(timeout * 1000))
102 else:
--> 103 return self._jsq.awaitTermination()
104
105 @property
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
131 # Hide where the exception came from that shows a non-Pythonic
132 # JVM exception message.
--> 133 raise_from(converted)
134 else:
135 raise
/databricks/spark/python/pyspark/sql/utils.py in raise_from(e)
StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/databricks/spark/python/pyspark/sql/utils.py", line 206, in call
raise e
File "/databricks/spark/python/pyspark/sql/utils.py", line 203, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "<command-3982255611789722>", line 57, in write_to_parquet_table
exceptions_destination_table
File "/databricks/spark/python/pyspark/sql/readwriter.py", line 868, in saveAsTable
self._jwrite.saveAsTable(name)
File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/databricks/spark/python/pyspark/sql/utils.py", line 133, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: cannot resolve 'data' given input columns: [body, timestamp, exception];
=== Streaming Query ===
Identifier: [id = b90b4921-b795-4e7d-b50d-1483b7956ec8, runId = 0cb697b8-5b15-40ae-a6bb-07e6a8e8a7a4]
Current Committed Offsets: {}
Current Available Offsets: {org.apache.spark.sql.eventhubs.EventHubsSource@f2f568a: {"api_dbwrite":{"8":251,"11":251,"2":244,"5":246,"14":253,"13":250,"4":247,"7":246,"1":245,"10":252,"9":254,"3":244,"12":254,"15":250,"6":246,"0":247}}}
Current State: ACTIVE Thread State: RUNNABLE
Logical Plan: org.apache.spark.sql.eventhubs.EventHubsSource@f2f568a
write_to_table function
def write_to_table(batch_df, epochId):
batch_df.persist()
try:
df = batch_df.withColumn("body", batch_df["body"].cast("string"))
df = df[["body"]]
# remove the array from the json if it exists.
remove_array_udf = udf(remove_array, StringType())
df = df.withColumn(
"body",
remove_array_udf(col("body")),
)
body = df.head(1)[0]["body"]
body_dict=json.loads(body)
message_type = body_dict['message_type']
print("message_type:",message_type)
logger.info(f"processing incoming record with message_type: {message_type}")
if (message_type=='Label'):
tablename=databricks_destination_database+'.label'
schema=label_schema
elif (message_type=="HistogramJobStat"):
tablename=databricks_destination_database+'.histogram_job_stat'
schema=histogram_job_stat_schema
elif (message_type=="JobMetric"):
tablename=databricks_destination_database+'.job_metric'
schema=job_metric_schema
else:
raise Exception(
"Invalid message type provided: {}.".format(
message_type
)
)
logger.info(f"Destination table: {tablename}")
logger.info(f"Schema of table: {tablename} is {schema}")
data=body_dict['data']
data_df = spark.createDataFrame([data,], ['data',])
save_eventdf(data_df,tablename,schema)
except:
from pyspark.sql.functions import lit
import traceback
from datetime import datetime
exception_message_str = traceback.format_exc()
exp_df = df.withColumn("timestamp", lit(datetime.utcnow()))
exp_df = exp_df.withColumn("exception", lit(exception_message_str))
exceptions_destination_table = databricks_destination_database + ".api_exceptions"
logger.info(f"exceptions_destination_table = {exceptions_destination_table}")
exp_df.write.format("parquet").mode("append").saveAsTable(
exceptions_destination_table
)
logger.error(exception_message_str)
# Note: We don't want to shutdown the post processor we log the exception and carry on.
finally:
batch_df.unpersist()
anyone could help me figure out what's causing the error..thanks!
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
