'Create custom exchange-to-exchange binding using MassTransit

I have microservice-based system which works with documents. Service publishes DocflowErrorMq, ImportedDocflowMq events, and other services are subscribed to these events. Critical service DocflowRegistry should process messages quickly, so we have to introduce multiple consumers. On the other hand message order shouldn't be broken and competing consumer doesn't suite. Consistent hash exchange distributes messages by routing key equals to document id, messages related to one document goes to one queue. So, we have simple manual scaling. I can't create binding between MqModels.Docflows:ImportedDocflowMq and docflow-process-dr exchanges (marked red on Diagram). Is it possible to create it with MassTransit?

DocflowRegistry service config:

services.AddMassTransit(x =>
        {
            x.AddConsumer<DocflowSendingErrorTestConsumer>();
            x.AddConsumer<DocflowImportTestConsumer>();

            x.UsingRabbitMq((context, cfg) =>
            {
                var virtualHost = configuration["RabbitMq:Settings:VirtualHost"] ?? "/";
                cfg.Host(configuration["RabbitMqHost"], virtualHost, h =>
                {
                    h.Username(configuration["RabbitMqUserName"]);
                    h.Password(configuration["RabbitMqPassword"]);
                });

                cfg.ReceiveEndpoint("docflow.process-1.docflowregistry", e =>
                {
                    e.ConfigureConsumer<DocflowSendingErrorTestConsumer>(context);
                    e.ConfigureConsumer<DocflowImportTestConsumer>(context);

                    e.Bind("docflow-process-dr", x =>
                    {
                        x.Durable = true;
                        x.AutoDelete = false;
                        x.ExchangeType = "x-consistent-hash";
                        x.RoutingKey = "1";
                    });

                    e.ConfigureConsumeTopology = false;
                    e.SingleActiveConsumer = true;
                });

                cfg.ReceiveEndpoint("docflow.process-2.docflowregistry", e =>
                {
                    e.ConfigureConsumer<DocflowSendingErrorTestConsumer>(context);
                    e.ConfigureConsumer<DocflowImportTestConsumer>(context);

                    e.Bind("docflow-process-dr", x =>
                    {
                        x.Durable = true;
                        x.AutoDelete = false;
                        x.ExchangeType = "x-consistent-hash";
                        x.RoutingKey = "1";
                    });

                    e.ConfigureConsumeTopology = false;
                    e.ConcurrentMessageLimit = 1;
                    e.SingleActiveConsumer = true;
                });                   
            });
        });

Config of TodoList service:

services.AddMassTransit(x =>
        {
            x.AddConsumer<DocflowSendingErrorTestConsumer>();
            x.AddConsumer<DocflowImportTestConsumer>();

            x.UsingRabbitMq((context, cfg) =>
            {
                var virtualHost = configuration["RabbitMq:Settings:VirtualHost"] ?? "/";
                cfg.Host(configuration["RabbitMqHost"], virtualHost, h =>
                {
                    h.Username(configuration["RabbitMqUserName"]);
                    h.Password(configuration["RabbitMqPassword"]);
                });

                cfg.ReceiveEndpoint("docflow-process-todolist", e =>
                {
                    e.ConfigureConsumer<DocflowSendingErrorTestConsumer>(context);
                    e.ConfigureConsumer<DocflowImportTestConsumer>(context);

                    e.SingleActiveConsumer = true;
                });
            });
        });

Publish code:

var endPoint = await _massTransitBus.GetPublishSendEndpoint<DocflowErrorMq>();
        var docflowGuid = Guid.NewGuid();
        await endPoint.Send(new DocflowErrorMq
        {
            DocflowId = docflowGuid,
            AbonentId = Guid.NewGuid()
        },
        context =>
        {
            context.SetRoutingKey(docflowGuid.ToString());
        });


Solution 1:[1]

Create an interface, DocflowProcessDr, and make each of those message contracts published implement it. Then, you can configure the publish topology for that interface in the bus:

cfg.Message<DocflowProcessDr>(x => x.SetEntityName("docflow-process-dr"));
cfg.Publish<DocflowProcessDr>(x =>
{
    x.ExchangeType = "x-consistent-hash";
});

Since MassTransit will create a polymorphic topology on the broker, you'll have an exchange-to-exchange binding between the published type and the interface.

Then, just publish the message:

var docflowGuid = Guid.NewGuid();
var endPoint = await _massTransitBus.Publish<DocflowErrorMq>(new DocflowErrorMq
{
    DocflowId = docflowGuid,
    AbonentId = Guid.NewGuid()
},
context =>
{
    context.SetRoutingKey(docflowGuid.ToString());
});

Calling GetPublishSendEndpoint<T>() is weird, don't encourage it.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Chris Patterson