'How to trigger the Block and Unblock events in RabbitMQ?

I want to know when the RabbitMQ blocks or unblocks a connection so i added the following to my application:-

1- added the blocked listener to the RabbitMQ connection and implemented the handleUnblocked() and handleBlocked() methods.

2- i tried to make the brocker low on resources by sending 100k messages each of size 1MB.

3- added a delay inside the consumer so that the messages can't be consumed fast.

But still the handleBlocked() not called.

Here is my code:-

Producer

@Component
class Producer {
    private RabbitTemplate template;

    @Autowired
    public Producer(RabbitTemplate template) {
        this.template = template;
    }

    public void sendMessage(String exchange, String routingKey, String message) {
        this.template.convertAndSend(exchange, routingKey, message);
    }
}

Consumer

@Component
class Consumer {
    
    @RabbitListener(queues = "${apress.amqp.queue}")
    public void process(String message) throws InterruptedException {
        
        Thread.sleep(5000L);
    }
}

Configuration


@Configuration
class AMQPConfig {
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        template.execute(new ChannelCallback<Object>() {

            @Override
            public Object doInRabbit(Channel channel) throws Exception {

                channel.getConnection().addBlockedListener(new BlockedListener() {
                    
                    
                    public void handleUnblocked() throws IOException {
                        
                        System.out.println("UnBlocked");
                    }

                    public void handleBlocked(String reason) throws IOException {
                        
                        System.out.println("Blocked");
                    }
                });
                
                //when u stop the rabbitMQ service, this event will be triggered.
                channel.getConnection().addShutdownListener(new ShutdownListener() {
                    public void shutdownCompleted(ShutdownSignalException cause) {
                        
                        System.out.println("Rabbit Shutdown");
                    }
                });

                return null;
            }

        });
        
        return template;
    }
    

    //this bean is used to create the queue programmatically..
    @Bean
    public Queue queue(@Value("${apress.amqp.queue}") String queueName) {
        return new Queue(queueName, false);
    }
    
}

Application Main

@SpringBootApplication
public class AmqpDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(AmqpDemoApplication.class, args);
    }

    @Bean
    CommandLineRunner simple(@Value("${apress.amqp.exchange:}") String exchange,
            @Value("${apress.amqp.queue}") String routingKey, Producer producer) {
        return args -> {
            for(int i = 0; i < 100000; i++) {
                //send a big size messages to consume the memory and 
                //to force the rabbitMQ to make queue enters the "flow" state
                String messageOfSize1MB = ExamplesUtil.createDataSize(1024);
                producer.sendMessage(exchange, routingKey, messageOfSize1MB);
            }
        };
    }
}

Util class

class ExamplesUtil {

    /**
     * Creates a message of size @msgSize in KB.
     */
    public static String createDataSize(int msgSize) {
        // Java chars are 2 bytes
        msgSize = msgSize / 2;
        msgSize = msgSize * 1024;
        StringBuilder sb = new StringBuilder(msgSize);
        for (int i = 0; i < msgSize; i++) {
            sb.append('a');
        }
        return sb.toString();
    }
}

application.properties

apress.amqp.queue=spring-boot-queue
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.javaworld</groupId>
    <artifactId>code-snippet</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>   

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

    </dependencies>
</project>


Solution 1:[1]

You can login to rabbitmqctl and execute rabbitmqctl set_vm_memory_high_watermark 0.000001. With this you should be able to test blocked event on existing connections.

As per documentation here default value is set to 40%. so once you are done with tests you can then execute rabbitmqctl set_vm_memory_high_watermark 0.4 This should then unblock the connection.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 vishakha.s