'Masstransit - Mediator fails to responde to a scoped background service on Asp.Net
So I have an Asp.Net core app. I need to host a background service that sends periodicly messages to mediator. Problem is that when consumer tries to send a response back, it always fails. The fail is: loopback://localhost/response => The message was not consumed
There is no timeout, fail is immediate.
StackTrace:
at MassTransit.Pipeline.Filters.FaultDeadLetterFilter.Send(ReceiveContext context, IPipe`1 next)
at GreenPipes.Pipes.FilterPipe`1.GreenPipes.IPipe<TContext>.Send(TContext context)
at MassTransit.Pipeline.Filters.DeadLetterFilter.<GreenPipes-IFilter<MassTransit-ReceiveContext>-Send>d__3.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at MassTransit.Transports.ReceivePipeDispatcher.<Dispatch>d__17.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at MassTransit.Transports.ReceivePipeDispatcher.<Dispatch>d__17.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at MassTransit.Transports.ReceivePipeDispatcher.<Dispatch>d__17.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at MassTransit.Mediator.Endpoints.MediatorSendEndpoint.<SendMessage>d__26`1.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at MassTransit.Mediator.Endpoints.MediatorSendEndpoint.<SendMessage>d__26`1.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Service.Recieve.<Consume>d__7.MoveNext() in C:\...\Recieve.cs
So in my startup I set up mediator and background service:
services.AddMediator(cfg =>
{
cfg.AddRequestClient<Service.Command.RecieveCommand>();
cfg.AddConsumer<Service.Recieve>();
});
services.AddHostedService<RecieveMarketingMessagesService>();
services.AddScoped<IRecieveScopedMarketingMessageService, RecieveScopedMarketingMessageService>();
My background service does initialize new scope on every run.
public class RecieveMarketingMessagesService : BackgroundService, IDisposable
{
private readonly IServiceProvider serviceProvider;
public RecieveMarketingMessagesService(IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
using (var scope = serviceProvider.CreateScope())
{
var service = scope.ServiceProvider.GetRequiredService<IRecieveScopedMarketingMessageService>();
await service.Process(stoppingToken);
}
}
}
}
And gets scoped service:
public class RecieveScopedMarketingMessageService : IRecieveScopedMarketingMessageService
{
private readonly ILogger logger;
private readonly IRequestClient<RecieveCommand> client;
public RecieveScopedMarketingMessageService(ILoggerFactory loggerFactory, IRequestClient<RecieveCommand> client)
{
this.logger = loggerFactory.CreateLogger<RecieveScopedMarketingMessageService>();
this.client = client;
}
public async Task Process(CancellationToken stoppingToken)
{
try
{
await client.GetResponse<RecieveResponse>(new RecieveCommand(), cancellationToken: stoppingToken);
}
catch (Exception ex)
{
logger.LogCritical(ex, "---------FAIL FAIL----------");
}
}
}
Inside the scope service I inject the Request client and call 'GetResponse'.
Here is the consumer:
public class Recieve : IConsumer<RecieveCommand>
{
private readonly ILogger logger;
public Recieve(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<Recieve>();
}
public async Task Consume(ConsumeContext<RecieveCommand> context)
{
try
{
/* HERE BE SOME WORK ... */
await context.RespondAsync(new RecieveResponse()); /* <------ HERE ALWAYS FAILS */
}
catch(Exception ex)
{
}
}
}
I have no idea where is the problem. Thanks for help.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
