'ConcurrentDictionary GetOrAdd async

I want to use something like GetOrAdd with a ConcurrentDictionary as a cache to a webservice. Is there an async version of this dictionary? GetOrAdd will be making a web request using HttpClient, so it would be nice if there was a version of this dictionary where GetOrAdd was async.

To clear up some confusion, the contents of the dictionary will be the response from a call to a webservice.

ConcurrentDictionary<string, Response> _cache
    = new ConcurrentDictionary<string, Response>();

var response = _cache.GetOrAdd("id",
    (x) => { _httpClient.GetAsync(x).GetAwaiter().GetResponse(); });


Solution 1:[1]

GetOrAdd won't become an asynchronous operation because accessing the value of a dictionary isn't a long running operation.

What you can do however is simply store tasks in the dictionary, rather than the materialized result. Anyone needing the results can then await that task.

However, you also need to ensure that the operation is only ever started once, and not multiple times. To ensure that some operation runs only once, and not multiple times, you also need to add in Lazy:

ConcurrentDictionary<string, Lazy<Task<Response>>> _cache = new ConcurrentDictionary<string, Lazy<Task<Response>>>();

var response = await _cache.GetOrAdd("id", url => new Lazy<Task<Response>>(_httpClient.GetAsync(url))).Value;

Solution 2:[2]

The GetOrAdd method is not that great to use for this purpose. Since it does not guarantee that the factory runs only once, the only purpose it has is a minor optimization (minor since additions are rare anyway) in that it doesn't need to hash and find the correct bucket twice (which would happen twice if you get and set with two separate calls).

I would suggest that you check the cache first, if you do not find the value in the cache, then enter some form of critical section (lock, semaphore, etc.), re-check the cache, if still missing then fetch the value and insert into the cache.

This ensures that your backing store is only hit once; even if multiple requests get a cache miss at the same time, only the first one will actually fetch the value, the other requests will await the semaphore and then return early since they re-check the cache in the critical section.

Psuedo code (using SemaphoreSlim with count of 1, since you can await it asynchronously):

async Task<TResult> GetAsync(TKey key)
{
    // Try to fetch from catch
    if (cache.TryGetValue(key, out var result)) return result;

    // Get some resource lock here, for example use SemaphoreSlim 
    // which has async wait function:
    await semaphore.WaitAsync();    
    try 
    {
        // Try to fetch from cache again now that we have entered 
        // the critical section
        if (cache.TryGetValue(key, out result)) return result;

        // Fetch data from source (using your HttpClient or whatever), 
        // update your cache and return.
        return cache[key] = await FetchFromSourceAsync(...);
    }
    finally
    {
        semaphore.Release();
    }
}

Solution 3:[3]

Try this extension method:

/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey, TValue}"/> by using the specified function 
/// if the key does not already exist. Returns the new value, or the existing value if the key exists.
/// </summary>
public static async Task<TResult> GetOrAddAsync<TKey,TResult>(
    this ConcurrentDictionary<TKey,TResult> dict,
    TKey key, Func<TKey,Task<TResult>> asyncValueFactory)
{
    if (dict.TryGetValue(key, out TResult resultingValue))
    {
        return resultingValue;
    }
    var newValue = await asyncValueFactory(key);
    return dict.GetOrAdd(key, newValue);
}

Instead of dict.GetOrAdd(key,key=>something(key)), you use await dict.GetOrAddAsync(key,async key=>await something(key)). Obviously, in this situation you just write it as await dict.GetOrAddAsync(key,something), but I wanted to make it clear.

In regards to concerns about preserving the order of operations, I have the following observations:

  1. Using the normal GetOrAdd will get the same effect if you look at the way it is implemented. I literally used the same code and made it work for async. Reference says

the valueFactory delegate is called outside the locks to avoid the problems that can arise from executing unknown code under a lock. Therefore, GetOrAdd is not atomic with regards to all other operations on the ConcurrentDictionary<TKey,TValue> class

  1. SyncRoot is not supported in ConcurrentDictionary, they use an internal locking mechanism, so locking on it is not possible. Using your own lock mechanism works only for this extension method, though. If you use another flow (using GetOrAdd for example) you will face the same problem.

Solution 4:[4]

Probably using a dedicated memory cache (like the new or the old MemoryCache classes, or this third-party library) should be preferable to using a simple ConcurrentDictionary. Unless you don't really need commonly used functionality like time-based expiration, size-based compacting, automatic eviction of entries that are dependent on other entries that have expired, or dependent on mutable external resources (like files, databases etc). It should be noted though that the MemoryCache may still need some work in order to handle asynchronous delegates properly, since its out-of-the-box behavior is not ideal.

Below is a custom extension method GetOrAddAsync for ConcurrentDictionarys that have Task<TValue> values. It accepts a factory method, and ensures that the method will be invoked at most once. It also ensures that failed tasks are removed from the dictionary.

/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, Task<TValue>> valueFactory)
{
    if (!source.TryGetValue(key, out var currentTask))
    {
        Task<TValue> newTask = null;
        var newTaskTask = new Task<Task<TValue>>(async () =>
        {
            try { return await valueFactory(key).ConfigureAwait(false); }
            catch
            {
                source.TryRemove(KeyValuePair.Create(key, newTask));
                throw;
            }
        });
        newTask = newTaskTask.Unwrap();
        currentTask = source.GetOrAdd(key, newTask);
        if (currentTask == newTask) newTaskTask.Start(TaskScheduler.Default);
    }
    return currentTask;
}

Usage example:

var cache = new ConcurrentDictionary<string, Task<HttpResponseMessage>>();

var response = await cache.GetOrAddAsync("https://stackoverflow.com", async url =>
{
    return await _httpClient.GetAsync(url);
});

Overload with synchronous valueFactory delegate:

public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, TValue> valueFactory)
{
    if (!source.TryGetValue(key, out var currentTask))
    {
        Task<TValue> newTask = null;
        newTask = new Task<TValue>(() =>
        {
            try { return valueFactory(key); }
            catch
            {
                source.TryRemove(KeyValuePair.Create(key, newTask));
                throw;
            }
        });
        currentTask = source.GetOrAdd(key, newTask);
        if (currentTask == newTask) newTask.Start(TaskScheduler.Default);
    }
    return currentTask;
}

Both overloads invoke the valueFactory delegate on the ThreadPool, ensuring that the current thread will not be blocked. If you have some reason to prefer invoking the delegate on the current thread, you can just replace the Start with the RunSynchronously.

For a version of the GetOrAddAsync method that compiles on the .NET Framework and the .NET Core, you can look at the 3rd revision of this answer.

Solution 5:[5]

I solved this years ago before ConcurrentDictionary and the TPL was born. I'm in a café and don't have that original code but it went something like this.

It's not a rigorous answer but may inspire your own solution. The important thing is to return the value that was just added or exists already along with the boolean so you can fork execution.

The design lets you easily fork the race winning logic vs. the losing logic.

public bool TryAddValue(TKey key, TValue value, out TValue contains)
{
    // guards etc.

    while (true)
    {
        if (this.concurrentDic.TryAdd(key, value))
        {
            contains = value;
            return true;
        }
        else if (this.concurrentDic.TryGetValue(key, out var existing))
        {
            contains = existing;
            return false;
        }
        else
        {
            // Slipped down the rare path. The value was removed between the
            // above checks. I think just keep trying because we must have
            // been really unlucky.

            // Note this spinning will cause adds to execute out of
            // order since a very unlucky add on a fast moving collection
            // could in theory be bumped again and again before getting
            // lucky and getting its value added, or locating existing.

            // A tiny random sleep might work. Experiment under load.
        }
    }
}

This could be made into an extension for ConcurrentDictionary or be a method on its own your own cache or something using locks.

Perhaps a GetOrAdd(K,V) could be used with an Object.ReferenceEquals() to check if it was added or not, instead of the spin design.

To be honest, the above code isn't the point of my answer. The power comes in the simple design of the method signature and how it affords the following:

static readonly ConcurrentDictionary<string, Task<Task<Thing>>> tasks = new();

//

var newTask = new Task<Task<Thing>>(() => GetThingAsync(thingId));

if (this.tasks.TryAddValue(thingId, newTask, out var task))
{
    task.Start();
}

var thingTask = await task;
var thing = await thingTask;

It's a little quirky how a Task needs to hold a Task (if your work is async), and there's the allocations of unused Tasks to consider.

I think it's a shame Microsoft didn't ship its thread-safe collection with this method, or extract a "concurrent collection" interface.

My real implementation was a cache with sophisticated expiring inner collections and stuff. I guess you could subclass the .NET Task class and add a CreatedAt property to aid with eviction.

Disclaimer I've not tried this at all, it's off top of head, but I used this sort of design in an ultra-hi thru-put app in 2009.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Yepeekai
Solution 2
Solution 3
Solution 4
Solution 5