'How can I make a C# rabbitmq client to work with a spring amqp server?
I am currently working on a project where I have to connect a C# client to a Spring Server using RPC over RabbitMQ. The problem i'm having is the spring server not replying back through the reply queue as it should. It receives the message, but when I use the rabbitTemplate.sendAndReceive(), in order to reply to the client, it either doesn't do anything or it crashes by going into an infinite exception loop, that looks like this:
2022-04-17 18:01:45.730 WARN 25836 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.fantastik4.applicationtier.rabbitmqserver.server.UserMQServer.getUser(org.springframework.amqp.core.Message)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:207) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1665) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1584) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1572) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1563) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1507) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) ~[spring-rabbit-2.4.3.jar:2.4.3]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
Caused by: org.springframework.amqp.core.AmqpMessageReturnedException: Message returned
at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$handleReturn$15(RabbitTemplate.java:2600) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.core.RabbitTemplate.handleReturn(RabbitTemplate.java:2615) ~[spring-rabbit-2.4.3.jar:2.4.3]
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.lambda$handle$2(PublisherCallbackChannelImpl.java:1088) ~[spring-rabbit-2.4.3.jar:2.4.3]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
... 1 common frames omitted
This is the code for the RabbitMQConfig:
@Configuration
public class RabbitMQConfig {
public static final String QUEUE = "prison.users";
public static final String RPC_EXCHANGE = "sep3.prison";
@Bean
public Queue msgQueue(){
return new Queue(QUEUE);
}
@Bean
public DirectExchange exchange(){
return new DirectExchange(RPC_EXCHANGE);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(msgQueue())
.to(exchange())
.with(QUEUE);
}
And then the listener, or server, in this case:
@Component
public class UserMQServer {
private RabbitTemplate rabbitTemplate;
@Autowired
public UserMQServer( RabbitTemplate rabbitTemplate) {
this.rabbitTemplate=rabbitTemplate;
}
@RabbitListener(queues="prison.users")
public void getUser(Message message){
System.out.println(new String(message.getBody()));
byte[] body = message.getBody();
System.out.println(message.getMessageProperties().getReplyTo());
Message build = MessageBuilder.withBody(("I am the server, I received the message from the client:" + new String(body)).getBytes()).build();
CorrelationData correlationData = new CorrelationData(message.getMessageProperties().getCorrelationId());
rabbitTemplate.sendAndReceive("","amq.rabbitmq.reply-to",build,correlationData);
}
}
This is the code for the C# Client:
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitMQClientTest;
public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string queueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient(string queueName)
{
this.queueName = queueName;
var factory = new ConnectionFactory()
{
HostName = "localhost",
};
connection = factory.CreateConnection();
channel = connection.CreateModel();
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = "amq.rabbitmq.reply-to";
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var response = Encoding.UTF8.GetString(body.ToArray());
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicConsume(
consumer: consumer,
queue: "amq.rabbitmq.reply-to",
autoAck: true);
channel.BasicPublish(
exchange: "",
routingKey: queueName,
basicProperties: props,
body: messageBytes);
return respQueue.Take();
}
public void Close()
{
connection.Close();
}
}
I have searched far and wide for what exactly I'm doing wrong, I've tried using a fixed reply queue, instead of the direct-reply-to, but still to no avail. Any answer that might help clarify this will be extremely welcome.
NOTE: I have tried and succeeded to make a C# client and C# server work together, same with spring. Only making them communicate from different environments seems to be the issue.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
