'How to save workitem of type Func<CancellationToken, Task> in DB in BackgroundService to call it again
I am trying to implement a separate service for running a heavy time taking method run in background. I am kind of trying to do something like Hangfire but I can't use Hangfire because of some reasons.
So I have implement a package which takes in the delegate of the method and queues it and run it in background and also retries for some count if any exceptions. So I want to add one more functionality to it is if while retrying application is stop. It should continue it's from that count.
For above this to work I need to save this Func<CancellationToken, Task> delegate to DB. So that whenever again when we start I can get it from DB and run with all the details like exact arguments value.
Below is the code for better understanding and it is to understand the flow it does not contain retry logic. This is BackgroundTaskQueue.cs
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
new ConcurrentQueue<Func<CancellationToken, Task>>();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
private readonly ILogger<BackgroundTaskQueue> _logger;
public async Task QueueBackgroundWorkItemAsync(
Func<CancellationToken, Task> workItem, string jobId)
{
try{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
catch (Exception ex) {
_logger.LogError(ex.Message);
}
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
return workItem;
}
}
QueuedHostedService.cs
public class QueuedHostedService : BackgroundService
{
private readonly ILogger _logger;
private readonly IApplicationDbContext _dbContext;
private string jobId;
private Func<CancellationToken, Task>? workItem;
private CancellationToken cancellationToken;
public QueuedHostedService(IApplicationDbContext dbContext,
IBackgroundTaskQueue taskQueue,
ILoggerFactory loggerFactory)
{
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
_dbContext = dbContext;
}
public IBackgroundTaskQueue TaskQueue { get; }
protected async override Task ExecuteAsync(
CancellationToken cancellationToken)
{
_logger.LogInformation("Queued Hosted Service is starting.");
this.cancellationToken = cancellationToken;
while (!this.cancellationToken.IsCancellationRequested)
{
workItem = await TaskQueue.DequeueAsync(cancellationToken);
await workItem(this.cancellationToken);
}
_logger.LogInformation("Queued Hosted Service is stopping.");
}
}
To make any method run in background we simply have to inject IBackgroundTaskQueue and call the method QueueBackgroundWorkItemAsync like below.
_queue.QueueBackgroundWorkItemAsync(async token => SomeMethod(arg1, arg2));
I want to make it same generic as Hangfire but I am unable to figure out what all fields I need to save to db from the Func<CancellationToken, Task>() and how do I get those fields. And another thing is how do I construct back that same Func<CancellationToken, Task>().
Solution 1:[1]
For above this to work I need to save this Func<CancellationToken, Task> delegate to DB.
This is how Hangfire works. It is one of the trickiest parts, and (IMO) it is also the worst part of the design. In particular, handling application upgrades can be tricky. It wasn't that long ago that Hangfire just flat-out didn't support rolling upgrades at all - now they do, but because of the delegate-serialization logic, they can only handle some types of application updates.
I recommend instead of trying to save a delegate, you have your code save a logical representation of the work to do. E.g.:
public interface IBackgroundWork { Task ExecuteAsync(CancellationToken token); }
public sealed class SomeBackgroundWork : IBackgroundWork
{
private readonly string _arg1;
private readonly string _arg2;
public SomeBackgroundWork(string arg1, string arg2)
{
_arg1 = arg1;
_arg2 = arg2;
}
public Task ExecuteAsync(CancellationToken token) =>
SomeMethodAsync(_arg1, _arg2);
}
Then serialize it appropriately (i.e., with a type field).
The disadvantage of this approach is that you need to define actual type representations for all of your background work item types. The advantage is that it fully captures all the logic without being obfuscated by delegate serialization, and it also is fully upgrade-safe, including rolling upgrades.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Stephen Cleary |
