'RabbitMQ keeps consuming; artisan command stop after all processed
I'm a bit new to RabbitMQ but understand the concept of AMQP. I've been trying to implement this in Laravel, as all implementations that I've seen are not capable of what I want.
I have an array of queues, these need to be processed one by one by a single artisan command. First the queue1 then queue2 etc. I understand that when you call basic_consume it registers an consumer and starts consuming a message, if not properly closed, the consumer remains active and when starting another basic_consume it creates another consumer. Resulting in splitting the queue amongst two consumers.
In my test Artisan command, I'm trying to clear a single queue and close the consumer after that has happend.
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class AMQPconsume extends Command
{
protected $signature = 'queues:amqp_consume';
protected $description = 'Binds all queues to exchanges';
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
$callback = function ($msg) {
Log::info('Message proccessed: ' . $msg->body);
$this->output->writeln( ' [x] Received ' . $msg->body);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('update_order_status', '', false, true, false, false, $callback);
$channel->consume();
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
Few observations:
- Nothing is written to the logs
- I do get console outputs
- The command never closes. It keeps running.
How do I make sure that the command stops and kill/unregisters the consumer after the queue has been processed?
Solution 1:[1]
Solution turns out to be very hard to find, but easy to do...
while ($channel->is_consuming()) {
$channel->wait(null, null, 2); // <-- two is the amount of seconds it has to wait before a timeout is called
}
It will throw an exception then, just try... catch the exception and start over.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Peter Fox |
