'How to trigger the Block and Unblock events in RabbitMQ?
I want to know when the RabbitMQ blocks or unblocks a connection so i added the following to my application:-
1- added the blocked listener to the RabbitMQ connection and implemented the handleUnblocked() and handleBlocked() methods.
2- i tried to make the brocker low on resources by sending 100k messages each of size 1MB.
3- added a delay inside the consumer so that the messages can't be consumed fast.
But still the handleBlocked() not called.
Here is my code:-
Producer
@Component
class Producer {
private RabbitTemplate template;
@Autowired
public Producer(RabbitTemplate template) {
this.template = template;
}
public void sendMessage(String exchange, String routingKey, String message) {
this.template.convertAndSend(exchange, routingKey, message);
}
}
Consumer
@Component
class Consumer {
@RabbitListener(queues = "${apress.amqp.queue}")
public void process(String message) throws InterruptedException {
Thread.sleep(5000L);
}
}
Configuration
@Configuration
class AMQPConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.execute(new ChannelCallback<Object>() {
@Override
public Object doInRabbit(Channel channel) throws Exception {
channel.getConnection().addBlockedListener(new BlockedListener() {
public void handleUnblocked() throws IOException {
System.out.println("UnBlocked");
}
public void handleBlocked(String reason) throws IOException {
System.out.println("Blocked");
}
});
//when u stop the rabbitMQ service, this event will be triggered.
channel.getConnection().addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
System.out.println("Rabbit Shutdown");
}
});
return null;
}
});
return template;
}
//this bean is used to create the queue programmatically..
@Bean
public Queue queue(@Value("${apress.amqp.queue}") String queueName) {
return new Queue(queueName, false);
}
}
Application Main
@SpringBootApplication
public class AmqpDemoApplication {
public static void main(String[] args) {
SpringApplication.run(AmqpDemoApplication.class, args);
}
@Bean
CommandLineRunner simple(@Value("${apress.amqp.exchange:}") String exchange,
@Value("${apress.amqp.queue}") String routingKey, Producer producer) {
return args -> {
for(int i = 0; i < 100000; i++) {
//send a big size messages to consume the memory and
//to force the rabbitMQ to make queue enters the "flow" state
String messageOfSize1MB = ExamplesUtil.createDataSize(1024);
producer.sendMessage(exchange, routingKey, messageOfSize1MB);
}
};
}
}
Util class
class ExamplesUtil {
/**
* Creates a message of size @msgSize in KB.
*/
public static String createDataSize(int msgSize) {
// Java chars are 2 bytes
msgSize = msgSize / 2;
msgSize = msgSize * 1024;
StringBuilder sb = new StringBuilder(msgSize);
for (int i = 0; i < msgSize; i++) {
sb.append('a');
}
return sb.toString();
}
}
application.properties
apress.amqp.queue=spring-boot-queue
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.javaworld</groupId>
<artifactId>code-snippet</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
Solution 1:[1]
You can login to rabbitmqctl and execute rabbitmqctl set_vm_memory_high_watermark 0.000001. With this you should be able to test blocked event on existing connections.
As per documentation here default value is set to 40%. so once you are done with tests you can then execute
rabbitmqctl set_vm_memory_high_watermark 0.4 This should then unblock the connection.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | vishakha.s |
