'RabbitMQ bulk consuming messages solution
I'm using RabbitMQ as a queue of different messages. When I consume this messages with two different consumers from one queue, I process them and insert processing results to a DB:
def consumer_callback(self, channel, delivery_tag, properties, message):
result = make_some_processing(message)
insert_to_db(result)
channel.basic_ack(delivery_tag)
I want to bulk consume messages from queue, that will reduce DB load. Since RabbitMQ does not support bulk reading messages by consumers I'm going to do smth like this:
some_messages_list = []
def consumer_callback(self, channel, delivery_tag, properties, message):
some_messages_list.append({delivery_tag: message})
if len(some_messages_list) > 1000:
results_list = make_some_processing_bulk(some_messages_list)
insert_to_db_bulk(results_list)
for tag in some_messages_list:
channel.basic_ack(tag)
some_messages_list.clear()
- Messages are in queue before they all fully processed
- If consumer falls or disconnects - messages stays safe
What do you think about that solution? If it's okay, how can I get all unacknoleged messages anew, if consumer falls?
Solution 1:[1]
I've tested this solution for several months and can say that it is pretty good. Till AMPQ doesn't provide feature for bulk consuming, we have to use some walkarounds like this.
Note: if you decided to use this solution, beware of concurent consuming with several consumers (threads), or use some Locks (I've used python threading.Lock module) to provide guarantees for no race conditions happens.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | ????? ???? |
