'How to writeStream Azure Service Bus Queues in Apache Spark with Databricks

I have used the the following link to receive messages from Azure Service Bus queues. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-python-how-to-use-queues

I only focusing on the following code (written in Databricks) to receive queues:

with servicebus_client:
    # get the Queue Receiver object for the queue
    receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
    with receiver:
        for msg in receiver:
            print("Received: " + str(msg))
            # complete the message so that the message is removed from the queue
            receiver.complete_message(msg)

When I manually send a message to the queue (This Is a a Message) it is successully received by Databricks

When I collect the output of the msg using str(msg) I get:

Out[77]: 'This Is a Message'

When I collect the whole output of the ServiceBusReceivedMessage I get following output

Out[76]: ServiceBusReceivedMessage(body=This Is a Message, application_properties={b'MachineName': b'DESKTOP-GV10S60', b'UserName': b'Carlton'}, session_id=None, message_id=d7ff2f86-dc6d-483c-85d2-bb6eebd5de51, content_type=None, correlation_id=None, to=None, reply_to=None, reply_to_session_id=None, subject=Service Bus Explorer, time_to_live=2 days, 2:00:10, partition_key=None, scheduled_enqueue_time_utc=0001-01-01 00:00:00+00:00, auto_renew_error=None, dead_letter_error_description=None, dead_letter_reason=None, dead_letter_source=None, delivery_count=0, enqueued_sequence_number=None, enqueued_time_utc=2022-02-05 12:06:46.154000+00:00, expires_at_utc=2022-02-07 14:06:56.154000+00:00, sequence_number=97, lock_token=None, locked_until_utc=None)

You will notice the body=This Is a Message.

I am trying to writeStream the body of the of message to a location either on DBFS or Azure Data Lake.

I have tried the following:

streamingQuery = (
   receiver
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "...") 
  .start()

But I get the error AttributeError: 'ServiceBusReceiver' object has no attribute 'writeStream'

I have also tried

streamingQuery = (
   str(msg)
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "...") 
  .start()

But I get the error AttributeError: 'str' object has no attribute 'writeStream'

I have been working on this for quite sometime.

If someone can let me know where I might be going wrong.

If someone could even just let me know if what I'm doing is even possible? That would at least allow me to work on another solution - instead of wasting my time trying to get this work.

Thanks in advance



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source