'Inmemoryoutbox not work with a masstransit jobconsumers jobcontext

I am using a JobConsumer to select a batch of items to be processed in a different process. For each item to process a new message is publish. I am using an inmemoryoutbox but the messages are published directly to rabbitmq. Actually I want all these messages to be collected in a memoryoutbox and all send after the loop has succeeded. When I debug my code the PublishEndpointProvider has a Masstransit.Middleware.InMemoryOutbix.InMemoryOutboxPublishEndpointProvider. But the messages are send directly to RabbitMq.

I am using Masstransit version 8.0.1.

Any idea what is missing in the configuration?

configuration

services.AddMassTransit(mt =>
{
  mt.AddDelayedMessageScheduler();
  mt.AddConsumersFromNamespaceContaining<RunConsumer>();

  mt.UsingRabbitMq((context, cfg) =>
  {
    cfg.UseDelayedMessageScheduler();
    cfg.Host(rabbitMqConfig.Host, host =>
    {
      host.Username(rabbitMqConfig.Username);
      host.Password(rabbitMqConfig.Password);
      host.Heartbeat(rabbitMqConfig.Heartbeat);
    });

    cfg.UseGlobalRetryPolicy();
    cfg.UseInMemoryOutbox();
    var options = new ServiceInstanceOptions()
                     .EnableJobServiceEndpoints();
    cfg.ServiceInstance(options, instance =>
    {
      instance.ConfigureJobServiceEndpoints(x =>
      {
        x.JobServiceStateEndpointName = "JobType";
        x.JobServiceJobAttemptStateEndpointName = "JobAttempt";
        x.JobServiceJobStateEndpointName = "Job";
      });
      instance.ConfigureEndpoints(context);
    });
  });
});
services.AddHostedService<MassTransitConsoleHostedService>();
return services;

Consumer definition

public class RunConsumerDefinition :
        ConsumerDefinition<RunConsumer>
{
  private IBusRegistrationContext context;

  public ProlongeerPeriodeConsumerDefinition(IBusRegistrationContext context)
  {
    this.context = context;
    EndpointName = 'MyQueue';
    ConcurrentMessageLimit = 1;
  }

  protected override void ConfigureConsumer(
    IReceiveEndpointConfigurator endpointConfigurator, 
    IConsumerConfigurator<RunConsumer> consumerConfigurator)
    {
      consumerConfigurator.Options<JobOptions<RunConsumer>>(options => options
                .SetJobTimeout(TimeSpan.FromMinutes(15))
                .SetConcurrentJobLimit(2));
            endpointConfigurator.UseMessageScope(context);
            endpointConfigurator.UseInMemoryOutbox();
    }
  }

RunConsumer

public async Task Run(JobContext<Run> context)
{
  foreach (var index in Enumerable.Range(1, 7))
  {
    var command = new ProcessCommand();
    await context.Publish<ProcessCommand>(command, context.CancellationToken);
  }
}



Solution 1:[1]

Job Consumers don't technically run as a consumer, they're run separately with a JobContext<T>. As such, they don't use any of the retry, outbox, or other filters from the pipeline.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Chris Patterson