'How to implement synchronous producer call in RabbitMQ
I am currently looking to develop a synchronous workflow in a RabbitMQ producer-consumer setup. The producer will publish a message to the queue and it will take about 6 - 8 seconds for the consumer to finish processing and return the result to consumer. The producer will then get the result and continue to execute the rest of the code, which requires the result from the consumer.
I understand Message Queues are designed to be synchronous, but is there any method to implement the process producer call -> consumer process -> consumer return result to producer synchronous? I searched around and found the tutorial from RabbitMQ docs, where it used a busy loop to do the waiting on the producer side:
rpc_client.py (producer)
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
rpc_server.py (consumer)
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
But I do not think the busy loop while self.response is None: is a very good idea, since it adds a very heavy load to the server running this code. I tried to use a lock to acquire the lock on the producer side and send the lock object to the consumer side, so the consumer can release the lock when the processing is done and the lock is released. However, the RabbitMQ client for Python, Pika, does not allow passing lock objects across the queue.
What will be an elegant way to avoid busy-wait to achieve the synchronous workflow here?
Solution 1:[1]
The consumer should publish the result elsewhere, and the producer can wait for the result to be available. Note that publishing the results in a AMQP message broker is generally a bad idea.
That's the design from result backends in Celery, so I'd suggest you just do something very similar, if not install Celery right away and drop the manual usage of Pika.
https://docs.celeryq.dev/en/latest/userguide/tasks.html#result-backends
Solution 2:[2]
The code you posted is the correct way to do what you want to do. If you have a problem with the while loop i suggest looking into something like asyncio's event loop. You could create a future for the response and only resolve that future when the response comes back on the callback queue. But at the end of the day its the same thing. The event loop is a fancy while True loop.
I also caution against doing something like this. Fair warning, there are many things to consider if you're building a proper system...
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | N1ngu |
| Solution 2 | testfile |
