'Can I write to the same server sent event stream from multiple async tasks?
I have an endpoint in an ASP.NET core project that initiates a process on an external service, and then polls that service for new info and sends this to a user client using server sent events. It also generates data that's dependent on the initial response, and this data needs to be sent to the client at a regular interval.
Here's a simplified version of the code so far:
[Route("/sse")]
public async Task Sse()
{
Response.Headers.Add("Content-Type", "text/event-stream");
(string orderId, string secret) beginResponse = await externalService.BeginProcess()
while (true)
{
// Data created internally that should be sent to client at a regular interval
await Response.WriteAsync("event: internalinfo\n");
await Response.WriteAsync($"data: {internalService.GenerateData(beginResponse.secret)}\n\n");
// Request to external service that is allowed to vary in response time
var externalResponse = await externalService.FetchInfo(beginResponse.orderId)
await Response.WriteAsync("event: externalinfo\n");
await Response.WriteAsync($"data: {externalResponse}\n\n");
if ("DONE".Equals(externalResponse))
{
return;
}
await Task.Delay(500)
}
}
Problem here is that the request to the external service can vary a lot in time, ruining the cadence of writes for the internally generated data. This in turn makes the entire process error prone.
I was thinking of separating the request to another asynchronous task that will write to the Response object whenever it gets something back from the external service, so that it won't block the internalinfo writes. How would I go about doing that though, and SHOULD I do that? If there's a better way to solve this issue I'd be happy to hear it!
Solution 1:[1]
I was thinking of separating the request to another asynchronous task that will write to the Response object whenever it gets something back from the external service, so that it won't block the internalinfo writes. How would I go about doing that though, and SHOULD I do that? If there's a better way to solve this issue I'd be happy to hear it!
Something like that should work fine. The only problem is that Response should only be accessed by one thread at a time. You wouldn't want your event: and data: sections to get interleaved between the internal and external processes. At a high level, you're taking your current single-producer code and splitting it so there's multiple producers.
So, one way to do this is to use mutual exclusion. You would split up your two producers and each one would only write while it holds the lock. E.g.:
Response.Headers.Add("Content-Type", "text/event-stream");
(string orderId, string secret) beginResponse = await externalService.BeginProcess();
var done = false;
var mutex = new SemaphoreSlim(1);
await Task.WhenAll(InternalProducerAsync(), ExternalProducerAsync());
async Task InternalProducerAsync()
{
while (!done)
{
await mutex.WaitAsync();
try
{
await Response.WriteAsync("event: internalinfo\n");
await Response.WriteAsync($"data: {internalService.GenerateData(beginResponse.secret)}\n\n");
}
finally { mutex.Release(); }
await Task.Delay(500);
}
}
async Task ExternalProducerAsync()
{
while (!done)
{
var externalResponse = await externalService.FetchInfo(beginResponse.orderId)
await mutex.WaitAsync();
try
{
await Response.WriteAsync("event: externalinfo\n");
await Response.WriteAsync($"data: {externalResponse}\n\n");
}
finally { mutex.Release(); }
if ("DONE".Equals(externalResponse))
{
done = true;
return;
}
await Task.Delay(500);
}
}
Another way to do this is to use an actual producer/consumer queue, such as System.Threading.Channels:
Response.Headers.Add("Content-Type", "text/event-stream");
(string orderId, string secret) beginResponse = await externalService.BeginProcess();
var channel = Channel.CreateUnbounded<(string EventName, string EventData)>();
await Task.WhenAll(ConsumerAsync(), InternalProducerAsync(), ExternalProducerAsync());
async Task ConsumerAsync()
{
await foreach (var (eventName, eventData) in channel.Reader.ReadAllAsync())
{
await Response.WriteAsync($"event: {eventName}\n");
await Response.WriteAsync($"data: {eventData}\n\n");
}
}
async Task InternalProducerAsync()
{
while (true)
{
if (!channel.Writer.TryWrite(("internalinfo", internalService.GenerateData(beginResponse.secret)))
break;
await Task.Delay(500);
}
}
async Task ExternalProducerAsync()
{
try
{
while (true)
{
var externalResponse = await externalService.FetchInfo(beginResponse.orderId)
await channel.Writer.WriteAsync(("externalinfo", externalResponse);
if ("DONE".Equals(externalResponse))
{
channel.Writer.Complete();
return;
}
await Task.Delay(500);
}
}
catch (Exception ex)
{
channel.Writer.Complete(ex);
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Stephen Cleary |
