'Dotnet RabbitMq Create queue dynamically

For the RabbitMq message broker system that I will use in my project that I developed in the .Net 5 environment; I have 10000 messages and I want each queue to carry a maximum of 100 messages. I want them to work in parallel or simultaneously. For this, I have given a policy to carry a maximum of 100 messages to each queue and defined the queue (x-dead-letter-exchange or x-dead-letter-routing-key) that will transfer messages when the maximum limit is reached. However, when the system reaches the maximum limit, it does not transfer to a different queue and continues to add to the same queue. I have a few code examples below. I look forward to your help on the matter.

Rabbitmq Initialize Service:

public class RabbitMqService : IRabbitMqService
{
        private readonly ConnectionFactory _factory;
        private readonly IConnection _connection;
        private readonly IModel _channel;
        public RabbitMqService()
        {
            _factory = new ConnectionFactory() { HostName = "localhost" };

            _connection = _factory.CreateConnection();

            _channel = _connection.CreateModel();

            Dictionary<String, Object> args = new Dictionary<String, Object>();
            //args.Add("x-overflow", "reject-publish");
            args.Add("x-max-length", 100);
            args.Add("x-dead-letter-routing-key", "my_queue_1");
            args.Add("x-dead-letter-exchange", "my_queue_1");

            _channel.QueueDeclare(
             queue: "my_queue_0",
             durable: true,
             exclusive: false,
             autoDelete: false,
             arguments: args);
}

Sender Service:


   public async Task AddQueueAsync(List<MyDto> list, CancellationToken cancellationToken)
        {
            try
            {
                var queueCount = list.Count / 10;
                var queueList = await GetQueueList(); //By Using Http client
                var lastQueue = queueList.Select(x => x.name).OrderByDescending(x => x).FirstOrDefault(x => x.StartsWith("my_queue"));
                var indexQueue = int.Parse(lastQueue.Split("_").Last());

                for (int iqueue = (indexQueue + 1); iqueue < queueCount; iqueue++)
                {

                    Dictionary<String, Object> args = new Dictionary<String, Object>();
                    args.Add("x-max-length", 10);
                    args.Add("x-dead-letter-exchange", "my_queue_" + (iqueue + 1));
                    args.Add("x-dead-letter-routing-key", "my_queue_" + (iqueue + 1));

                    _channel.QueueDeclare(
                      queue: "my_queue_" + iqueue,
                      durable: true,
                      exclusive: false,
                      autoDelete: true,
                      arguments: args);
                }
                foreach (var item in list)
                {
                    var message = JsonConvert.SerializeObject(item);
                    var body = Encoding.UTF8.GetBytes(message);
                    var properties = _channel.CreateBasicProperties();
                    properties.Persistent = true;
                    var routingKey = $"my_queue_0";

                    _channel.BasicPublish(exchange: "",
                                   routingKey: routingKey,
                                   basicProperties: properties,
                                   body: body);
                }
            }
            catch (Exception ex)
            {

                throw ex;
            }

        }

Receiver Service:

 public class ReceiverService
    {

        protected IConnection _connection;
        protected IModel _channel;
        public ReceiverService()
        {
            IRabbitMqService _rabbitMqService = ServiceTool.ServiceProvider.GetRequiredService<IRabbitMqService>();
            _connection = _rabbitMqService.GetIConnection();
            _channel = _rabbitMqService.GetChannel();
        }
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            return Task.Run(async () =>
            {
                if (stoppingToken.IsCancellationRequested)
                {
                    _channel.Dispose();
                    _connection.Dispose();
                }

                var consumer = new EventingBasicConsumer(_channel);
                var client = new ManagementClient("http://localhost", "guest", "guest"); //EasyNetQ

                consumer.Received += async (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var messageDto= JsonConvert.DeserializeObject<MyDto>(message);

                    string result = await CallService(messageDto);
                    if (!string.IsNullOrEmpty(result))
                    {
                        _channel.BasicAck(ea.DeliveryTag, false);

                    }
                    else
                    {
                        _channel.BasicNack(ea.DeliveryTag, false, false);
                    }
                };

                var queueList = await client.GetQueuesAsync();
                var myQueueList = queueList.Where(x => x.Name.Contains("my_queue")).ToList();
                if (myQueueList.Count > 0)
                {
                    foreach (var queue in myQueueList)
                    {
                        _channel.BasicConsume(queue: queue.Name, autoAck: false, consumer: consumer);
                    }
                }
            });

        }
    }

Thanks for help.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source