'Dotnet RabbitMq Create queue dynamically
For the RabbitMq message broker system that I will use in my project that I developed in the .Net 5 environment; I have 10000 messages and I want each queue to carry a maximum of 100 messages. I want them to work in parallel or simultaneously. For this, I have given a policy to carry a maximum of 100 messages to each queue and defined the queue (x-dead-letter-exchange or x-dead-letter-routing-key) that will transfer messages when the maximum limit is reached. However, when the system reaches the maximum limit, it does not transfer to a different queue and continues to add to the same queue. I have a few code examples below. I look forward to your help on the matter.
Rabbitmq Initialize Service:
public class RabbitMqService : IRabbitMqService
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private readonly IModel _channel;
public RabbitMqService()
{
_factory = new ConnectionFactory() { HostName = "localhost" };
_connection = _factory.CreateConnection();
_channel = _connection.CreateModel();
Dictionary<String, Object> args = new Dictionary<String, Object>();
//args.Add("x-overflow", "reject-publish");
args.Add("x-max-length", 100);
args.Add("x-dead-letter-routing-key", "my_queue_1");
args.Add("x-dead-letter-exchange", "my_queue_1");
_channel.QueueDeclare(
queue: "my_queue_0",
durable: true,
exclusive: false,
autoDelete: false,
arguments: args);
}
Sender Service:
public async Task AddQueueAsync(List<MyDto> list, CancellationToken cancellationToken)
{
try
{
var queueCount = list.Count / 10;
var queueList = await GetQueueList(); //By Using Http client
var lastQueue = queueList.Select(x => x.name).OrderByDescending(x => x).FirstOrDefault(x => x.StartsWith("my_queue"));
var indexQueue = int.Parse(lastQueue.Split("_").Last());
for (int iqueue = (indexQueue + 1); iqueue < queueCount; iqueue++)
{
Dictionary<String, Object> args = new Dictionary<String, Object>();
args.Add("x-max-length", 10);
args.Add("x-dead-letter-exchange", "my_queue_" + (iqueue + 1));
args.Add("x-dead-letter-routing-key", "my_queue_" + (iqueue + 1));
_channel.QueueDeclare(
queue: "my_queue_" + iqueue,
durable: true,
exclusive: false,
autoDelete: true,
arguments: args);
}
foreach (var item in list)
{
var message = JsonConvert.SerializeObject(item);
var body = Encoding.UTF8.GetBytes(message);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
var routingKey = $"my_queue_0";
_channel.BasicPublish(exchange: "",
routingKey: routingKey,
basicProperties: properties,
body: body);
}
}
catch (Exception ex)
{
throw ex;
}
}
Receiver Service:
public class ReceiverService
{
protected IConnection _connection;
protected IModel _channel;
public ReceiverService()
{
IRabbitMqService _rabbitMqService = ServiceTool.ServiceProvider.GetRequiredService<IRabbitMqService>();
_connection = _rabbitMqService.GetIConnection();
_channel = _rabbitMqService.GetChannel();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.Run(async () =>
{
if (stoppingToken.IsCancellationRequested)
{
_channel.Dispose();
_connection.Dispose();
}
var consumer = new EventingBasicConsumer(_channel);
var client = new ManagementClient("http://localhost", "guest", "guest"); //EasyNetQ
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var messageDto= JsonConvert.DeserializeObject<MyDto>(message);
string result = await CallService(messageDto);
if (!string.IsNullOrEmpty(result))
{
_channel.BasicAck(ea.DeliveryTag, false);
}
else
{
_channel.BasicNack(ea.DeliveryTag, false, false);
}
};
var queueList = await client.GetQueuesAsync();
var myQueueList = queueList.Where(x => x.Name.Contains("my_queue")).ToList();
if (myQueueList.Count > 0)
{
foreach (var queue in myQueueList)
{
_channel.BasicConsume(queue: queue.Name, autoAck: false, consumer: consumer);
}
}
});
}
}
Thanks for help.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
