'Azure Service Bus Error: Ensure RequiresSession is set to true when creating a Queue, with Databricks

When attempting to retrieve Queue messages from Azure Service via Databricks I get the following error:

Ensure RequiresSession is set to true when creating a Queue or Subscription to enable sessionful behavior

I have set the Message Queue using Azure's Service Bus Explorer, see below

enter image description here

As you can see I have set the sessionId to carlsession6

Therefore, I'm not sure why I'm getting the error message

The code in databricks is as follows:

session_id = "carlsession6"

connstr = "Endpoint=sb://carlsbus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=myaccesskey"
queue_name = "carlsqueue"

prevTable = None
table = None
rows = []
msgs = []
run = True

batchMsgs = 0

totalMsgs = 0

with ServiceBusClient.from_connection_string(connstr) as client:
    # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
    # Default is None; to receive forever.
    
    while run:
      
      with client.get_queue_receiver(queue_name, session_id=session_id, max_wait_time=5) as receiver:
           
          logDebug(D_LEVEL[1], 'GOT QUEUE RECEIVER')
          rows = []
          msgs = []
    
          i = 0
          for msg in receiver: 
          
            receiver.session.renew_lock()
          
            logDebug(D_LEVEL[1], 'READ MESSAGE')
            msgs.append(msg)
          
            msgJSON = json.loads(str(msg))
          
            if 'table' in msgJSON['control']:
              table = msgJSON['control']['table']
          
            if prevTable is None:
              prevTable = table
                     
            if prevTable != table or batchMsgs >= MAX_BATCH_SIZE:
              writeData(prevTable, schema, rows, msgs)
              rows = []
              msgs = []
              batchMsgs = 0
            
            prevTable = table
          
            i += 1
            batchMsgs += 1
          
            if (i % 100 == 0):
              logInfo('Processed ' + str(i) + ' messages')
          
            if 'operation' in msgJSON['control']:
              if msgJSON['control']['operation'] == "stop_message_processor":
                logInfo('RECEIVED stop_message_processor...')
                run = False
                break
        
            schema, rows = processMsg(msgJSON, rows)    
                            
            logDebug(D_LEVEL[1], 'COMPLETED MESSAGE')
          
            if STOP_AFTER is not None:
              if i >= STOP_AFTER:
                run = False
                break
                
          if len(rows) > 0:  
            writeData(table, schema, rows, msgs)
          elif len(msgs) == 1:
            receiver.complete_message(msg)
     
      logInfo('PROCESSED ' + str(i) + ' MESSAGES FROM QUEUE')
      totalMsgs += i  
      
    logInfo('TOTAL MESSAGES PROCESSED ' + str(totalMsgs))

Any thoughts on how to resolve the error?



Solution 1:[1]

Problem resolved with the following code:

New-AzServiceBusQueue -ResourceGroup myresourcegroup -NamespaceName mynamespace -QueueName myqueue ` -RequiresSession $True

As described at the following link https://docs.microsoft.com/en-us/azure/service-bus-messaging/enable-message-sessions

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Patterson