'Inmemoryoutbox not work with a masstransit jobconsumers jobcontext
I am using a JobConsumer to select a batch of items to be processed in a different process. For each item to process a new message is publish. I am using an inmemoryoutbox but the messages are published directly to rabbitmq. Actually I want all these messages to be collected in a memoryoutbox and all send after the loop has succeeded. When I debug my code the PublishEndpointProvider has a Masstransit.Middleware.InMemoryOutbix.InMemoryOutboxPublishEndpointProvider. But the messages are send directly to RabbitMq.
I am using Masstransit version 8.0.1.
Any idea what is missing in the configuration?
configuration
services.AddMassTransit(mt =>
{
mt.AddDelayedMessageScheduler();
mt.AddConsumersFromNamespaceContaining<RunConsumer>();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.Host(rabbitMqConfig.Host, host =>
{
host.Username(rabbitMqConfig.Username);
host.Password(rabbitMqConfig.Password);
host.Heartbeat(rabbitMqConfig.Heartbeat);
});
cfg.UseGlobalRetryPolicy();
cfg.UseInMemoryOutbox();
var options = new ServiceInstanceOptions()
.EnableJobServiceEndpoints();
cfg.ServiceInstance(options, instance =>
{
instance.ConfigureJobServiceEndpoints(x =>
{
x.JobServiceStateEndpointName = "JobType";
x.JobServiceJobAttemptStateEndpointName = "JobAttempt";
x.JobServiceJobStateEndpointName = "Job";
});
instance.ConfigureEndpoints(context);
});
});
});
services.AddHostedService<MassTransitConsoleHostedService>();
return services;
Consumer definition
public class RunConsumerDefinition :
ConsumerDefinition<RunConsumer>
{
private IBusRegistrationContext context;
public ProlongeerPeriodeConsumerDefinition(IBusRegistrationContext context)
{
this.context = context;
EndpointName = 'MyQueue';
ConcurrentMessageLimit = 1;
}
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<RunConsumer> consumerConfigurator)
{
consumerConfigurator.Options<JobOptions<RunConsumer>>(options => options
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(2));
endpointConfigurator.UseMessageScope(context);
endpointConfigurator.UseInMemoryOutbox();
}
}
RunConsumer
public async Task Run(JobContext<Run> context)
{
foreach (var index in Enumerable.Range(1, 7))
{
var command = new ProcessCommand();
await context.Publish<ProcessCommand>(command, context.CancellationToken);
}
}
Solution 1:[1]
Job Consumers don't technically run as a consumer, they're run separately with a JobContext<T>. As such, they don't use any of the retry, outbox, or other filters from the pipeline.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Chris Patterson |
