'RabbitMQ keeps consuming; artisan command stop after all processed

I'm a bit new to RabbitMQ but understand the concept of AMQP. I've been trying to implement this in Laravel, as all implementations that I've seen are not capable of what I want.

I have an array of queues, these need to be processed one by one by a single artisan command. First the queue1 then queue2 etc. I understand that when you call basic_consume it registers an consumer and starts consuming a message, if not properly closed, the consumer remains active and when starting another basic_consume it creates another consumer. Resulting in splitting the queue amongst two consumers.

In my test Artisan command, I'm trying to clear a single queue and close the consumer after that has happend.

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class AMQPconsume extends Command
{

    protected $signature = 'queues:amqp_consume';

    protected $description = 'Binds all queues to exchanges';

    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/');

        $channel = $connection->channel();

        $callback = function ($msg) {
            Log::info('Message proccessed: ' . $msg->body);
            $this->output->writeln( ' [x] Received ' . $msg->body);
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('update_order_status', '', false, true, false, false, $callback);

        $channel->consume();

        while ($channel->is_consuming()) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

Few observations:

  1. Nothing is written to the logs
  2. I do get console outputs
  3. The command never closes. It keeps running.

How do I make sure that the command stops and kill/unregisters the consumer after the queue has been processed?



Solution 1:[1]

Solution turns out to be very hard to find, but easy to do...

        while ($channel->is_consuming()) {
            $channel->wait(null, null, 2); // <-- two is the amount of seconds it has to wait before a timeout is called
        }

It will throw an exception then, just try... catch the exception and start over.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Peter Fox