'How to writeStream Azure Service Bus Queues in Apache Spark with Databricks
I have used the the following link to receive messages from Azure Service Bus queues. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-python-how-to-use-queues
I only focusing on the following code (written in Databricks) to receive queues:
with servicebus_client:
# get the Queue Receiver object for the queue
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
with receiver:
for msg in receiver:
print("Received: " + str(msg))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
When I manually send a message to the queue (This Is a a Message) it is successully received by Databricks
When I collect the output of the msg using str(msg) I get:
Out[77]: 'This Is a Message'
When I collect the whole output of the ServiceBusReceivedMessage I get following output
Out[76]: ServiceBusReceivedMessage(body=This Is a Message, application_properties={b'MachineName': b'DESKTOP-GV10S60', b'UserName': b'Carlton'}, session_id=None, message_id=d7ff2f86-dc6d-483c-85d2-bb6eebd5de51, content_type=None, correlation_id=None, to=None, reply_to=None, reply_to_session_id=None, subject=Service Bus Explorer, time_to_live=2 days, 2:00:10, partition_key=None, scheduled_enqueue_time_utc=0001-01-01 00:00:00+00:00, auto_renew_error=None, dead_letter_error_description=None, dead_letter_reason=None, dead_letter_source=None, delivery_count=0, enqueued_sequence_number=None, enqueued_time_utc=2022-02-05 12:06:46.154000+00:00, expires_at_utc=2022-02-07 14:06:56.154000+00:00, sequence_number=97, lock_token=None, locked_until_utc=None)
You will notice the body=This Is a Message.
I am trying to writeStream the body of the of message to a location either on DBFS or Azure Data Lake.
I have tried the following:
streamingQuery = (
receiver
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "...")
.start()
But I get the error AttributeError: 'ServiceBusReceiver' object has no attribute 'writeStream'
I have also tried
streamingQuery = (
str(msg)
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "...")
.start()
But I get the error AttributeError: 'str' object has no attribute 'writeStream'
I have been working on this for quite sometime.
If someone can let me know where I might be going wrong.
If someone could even just let me know if what I'm doing is even possible? That would at least allow me to work on another solution - instead of wasting my time trying to get this work.
Thanks in advance
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
