'Masstransit - Mediator fails to responde to a scoped background service on Asp.Net

So I have an Asp.Net core app. I need to host a background service that sends periodicly messages to mediator. Problem is that when consumer tries to send a response back, it always fails. The fail is: loopback://localhost/response => The message was not consumed

There is no timeout, fail is immediate.

StackTrace:

    at MassTransit.Pipeline.Filters.FaultDeadLetterFilter.Send(ReceiveContext context, IPipe`1 next)
    at GreenPipes.Pipes.FilterPipe`1.GreenPipes.IPipe<TContext>.Send(TContext context)
    at MassTransit.Pipeline.Filters.DeadLetterFilter.<GreenPipes-IFilter<MassTransit-ReceiveContext>-Send>d__3.MoveNext()
    at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
    at MassTransit.Transports.ReceivePipeDispatcher.<Dispatch>d__17.MoveNext()
    at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
    at MassTransit.Transports.ReceivePipeDispatcher.<Dispatch>d__17.MoveNext()
    at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
    at MassTransit.Transports.ReceivePipeDispatcher.<Dispatch>d__17.MoveNext()
    at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
    at MassTransit.Mediator.Endpoints.MediatorSendEndpoint.<SendMessage>d__26`1.MoveNext()
    at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
    at MassTransit.Mediator.Endpoints.MediatorSendEndpoint.<SendMessage>d__26`1.MoveNext()
    at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
    at Service.Recieve.<Consume>d__7.MoveNext() in C:\...\Recieve.cs

So in my startup I set up mediator and background service:

        services.AddMediator(cfg =>
        {
            cfg.AddRequestClient<Service.Command.RecieveCommand>();
            cfg.AddConsumer<Service.Recieve>();
        });

        services.AddHostedService<RecieveMarketingMessagesService>();
        services.AddScoped<IRecieveScopedMarketingMessageService, RecieveScopedMarketingMessageService>();

My background service does initialize new scope on every run.

public class RecieveMarketingMessagesService : BackgroundService, IDisposable
{
    private readonly IServiceProvider serviceProvider;

    public RecieveMarketingMessagesService(IServiceProvider serviceProvider)
    {
        this.serviceProvider = serviceProvider;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            using (var scope = serviceProvider.CreateScope())
            {
                var service = scope.ServiceProvider.GetRequiredService<IRecieveScopedMarketingMessageService>();
                await service.Process(stoppingToken);
            }
        }
    }
}

And gets scoped service:

public class RecieveScopedMarketingMessageService : IRecieveScopedMarketingMessageService
{
    private readonly ILogger logger;
    private readonly IRequestClient<RecieveCommand> client;

    public RecieveScopedMarketingMessageService(ILoggerFactory loggerFactory, IRequestClient<RecieveCommand> client)
    {
        this.logger = loggerFactory.CreateLogger<RecieveScopedMarketingMessageService>();
        this.client = client;
    }

    public async Task Process(CancellationToken stoppingToken)
    {
        try
        {
            await client.GetResponse<RecieveResponse>(new RecieveCommand(), cancellationToken: stoppingToken);
        }
        catch (Exception ex)
        {
            logger.LogCritical(ex, "---------FAIL FAIL----------");
        }
    }
}

Inside the scope service I inject the Request client and call 'GetResponse'.

Here is the consumer:

public class Recieve : IConsumer<RecieveCommand>
{
    private readonly ILogger logger;

    public Recieve(ILoggerFactory loggerFactory)
    {
        this.logger = loggerFactory.CreateLogger<Recieve>();
    }

    public async Task Consume(ConsumeContext<RecieveCommand> context)
    {
        try
        {
            /* HERE BE SOME WORK ... */
            
            await context.RespondAsync(new RecieveResponse()); /* <------ HERE ALWAYS FAILS */
        }
        catch(Exception ex)
        {
            
        }
    }
}

I have no idea where is the problem. Thanks for help.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source