'Spark Structured Streaming Foreach Batch to Write data to Mounted Blob Storage Container

I am receiving streaming data and wanted to write my data from Spark databricks cluster to Azure Blob Storage Container.

To do this I have mounted my storage account and I am specifying my path into my streaming sink query.

Method 1

dataframe.writeStream\
    .format("text")\
    .trigger(processingTime='10 seconds')\
    .option("checkpointLocation", "/mnt/Checkpoint")\
    .option("path", "/mnt/Data")\
    .start()

Method 2

def process_row(df, epoch_id):
    try:
        df.write\
            .format("text")\
            .trigger(processingTime='10 seconds')\
            .option("path", "/mnt/Data")\
            .save()
        
    except (Exception) as error:
        print("Received an error ",error)
  

dataframe.writeStream.outputMode('append')\
    .foreachBatch(process_row).option("checkpointLocation", "/mnt/Checkpoint").start()

Using Method 1, I am able to write data to blob container. But using Method 2, data is not getting written to blob storage, nor it's displaying some error. Method 2 Query Dashboards shows me something like this :

Method 2 Query Dashboard Image

Am I missing something in Method 2's code?



Solution 1:[1]

I think foreachBatch is used when you want to, say, process each batch in more than one way or write to more than one location so if you want exception handling I would just put method 1 in your try-catch.

For curiosity sake if you want to use foreachBatch maybe try just .writeStream within scope and not have any write method outside so you leave the write call to the foreachBatch function.

Something like (not tested).

def process_row(df, epoch_id):
    try:
        df.writeStream\
        .format("text")\
        .trigger(processingTime='10 seconds')\
        .option("path", "/mnt/Data")\
        .option("checkpointLocation","/mnt/Checkpoint")\
        .outputMode(“append”)\
        .start()
   
    
    except (Exception) as error:
        print("Received an error ",error)


dataframe.foreachBatch(process_row)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Zaxier