'How to invoke MassTransit consumer using reflection
I am building a inbox transactional pattern approach for "intercepting" messages using an oberserver and filter to log them to a database. The idea is if a consumer failed to let a background worker periodically check the database for any records where the Integration event has failed, retrieve them and re-execute them.
The data being stored in the DB looks like this:
public class InboxMessage
{
public long Id { get; private set; }
public DateTime CreatedDate { get; private set; }
public string CreatedUser { get; private set; }
public DateTime EditedDate { get; private set; }
public string EditedUser { get; private set; }
public string MessageType { get; private set; }
public string? ConsumerType { get; private set; }
public string Data { get; private set; }
public Guid EventNumber { get; private set; }
public EventStatus Status { get; private set; }
}
The idea is to retrieve all messages with a Status = Failed, use reflection (or perhaps something else?) to Deserialise the "Data" prop to the MessageType. The ConsumerType would then be used to re-execute the consumer.
I use a Filter to log the initial message (before being transfered to the consumer):
public class InboxPatternConsumerFilter<T> : IFilter<ConsumeContext<T>> where T : class
{
private readonly IntegrationEventsContext _context;
private readonly ILogger _logger;
private const string MassTransitDynamicTypeName = "MassTransit.DynamicInternal.";
public InboxPatternConsumerFilter(ILoggerFactory logger, IntegrationEventsContext context)
{
_logger = logger.CreateLogger("InboxPatternConsumerFilter");
_context = context ?? throw new ArgumentNullException(nameof(context));
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
if (context.Message is IIntegrationEvent @event)
{
try
{
_logger.LogInformation("Integration event is type of - {generic}. Applying inbox pattern.", nameof(IGenericIntegrationEvent));
var message = new InboxMessage(
@event.EditedUser ?? "unknown",
context.Message.GetType().FullName?.Replace(MassTransitDynamicTypeName, string.Empty) ?? string.Empty,
null,
System.Text.Json.JsonSerializer.Serialize(context.Message),
@event.EventId,
EventStatus.Received);
_context.InboxMessages.Add(message);
await _context.SaveChangesAsync(context.CancellationToken);
}
catch (Exception ex)
{
// exception is catched to ensure the consumer can still continue.
_logger.LogError(ex, "Failed to create inbox message");
}
}
await next.Send(context);
}
public void Probe(ProbeContext context) {}
}
My reason for using a filter would be to check the EventNumber to confirm whether this message already exists in the DB, this should allow me to prevent sending this message to the consumer to resolve the idempotent issue in cases where we are using Retry mechanism for failed messaged.
I use a basic ReceiveObserver to update the messages as follows:
public class ReceiveObserver : IReceiveObserver
{
private readonly IntegrationEventsContext _context;
private readonly ILogger<ReceiveObserver> _logger;
public ReceiveObserver(IntegrationEventsContext context, ILogger<ReceiveObserver> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_context = context ?? throw new ArgumentNullException(nameof(context));
}
public async Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class
{
try
{
if (context.Message is IIntegrationEvent @event)
{
var message = await _context.InboxMessages.FirstOrDefaultAsync(x => x.EventNumber == @event.EventId);
if (message is not null)
{
message.Update(EventStatus.Completed, "post-consumer", consumerType);
_context.Update(message);
await _context.SaveChangesAsync();
var typeTest = System.Reflection.Assembly
.GetEntryAssembly()?
.GetType(consumerType);
}
else
{
_logger.LogWarning("Inbox Message not found");
}
}
}
catch (Exception ex)
{
_logger.LogError("An error occurred trying to update the Message's complete status", ex);
}
// called when the message was consumed, once for each consumer
}
public async Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan elapsed, string consumerType, Exception exception) where T : class
{
if (context.Message is IIntegrationEvent @event)
{
var message = await _context.InboxMessages.FirstOrDefaultAsync(x => x.EventNumber == @event.EventId);
if (message is not null)
{
message.Update(EventStatus.Failed, "consumer-fault", consumerType);
_context.Update(message);
await _context.SaveChangesAsync();
}
else
{
_logger.LogWarning("Inbox Message not found");
}
}
// called when the message is consumed but the consumer throws an exception
}
public Task ReceiveFault(ReceiveContext context, Exception exception)
{
// TODO: Get the message id and update the status in db.
// called when an exception occurs early in the message processing, such as deserialization, etc.
return Task.CompletedTask;
}
}
The idea is then to use a background service to check for any failed messages as follows:
public class InboxMessageService : IHostedService
{
private readonly IBusControl _bus;
private readonly IntegrationEventsContext _context;
private readonly ILogger<InboxMessageService> _logger;
private readonly IServiceProvider _serviceProvider;
public InboxMessageService(
IBusControl bus,
IntegrationEventsContext context,
ILogger<InboxMessageService> logger,
IServiceProvider serviceProvider)
{
_bus = bus ?? throw new ArgumentNullException(nameof(context)); ;
_context = context ?? throw new ArgumentNullException(nameof(context));
_logger = logger ?? throw new ArgumentNullException(nameof(context));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
public async Task StartAsync(CancellationToken cancellationToken)
{
List<InboxMessage> messages = await _context.InboxMessages
.Where(x => x.Status != EventStatus.Completed)
.ToListAsync(cancellationToken);
foreach(InboxMessage message in messages)
{
try
{
if (message.ConsumerType is null)
{
_logger.LogWarning("Unable to find the consumer type to start this message.");
continue;
}
var typeTest = System.Reflection.Assembly
.GetEntryAssembly()?
.GetType(message.ConsumerType);
if (typeTest is null)
{
throw new Exception();
}
var constructor = typeTest.GetConstructors().First(); // We can always assume that the consumer will contain a ctor
var parameters = new List<object?>();
foreach (var param in constructor.GetParameters())
{
var service = _serviceProvider.GetService(param.ParameterType);//get instance of the class
parameters.Add(service);
}
var obj = Activator.CreateInstance(typeTest, parameters.ToArray());
// TODO: fiqure out how to create a ConsumeContext<T> message from the DB data
typeTest.GetMethod("Consume")?.Invoke(obj, System.Reflection.BindingFlags.InvokeMethod, Type.DefaultBinder, null, null);
}
catch (Exception ex)
{
// ...
}
}
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
The part I am struggling with is the TODO in the background service to figure out how to essentially recreate the Message (using the MessageType and Data props) and invoke the Consumer (using the ConsumerType).
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
