'ForEachAsync with Result

I'm trying to change Stephen Toub's ForEachAsync<T> extension method into an extension which returns a result...

Stephen's extension:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

My approach (not working; tasks get executed but result is wrong)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
    int degreeOfParallelism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); // When I "return await",
                        // I get good results but only one per partition 
            return default(TResult);
        }));
}

I know I somehow have to return (WhenAll?) the results from the last part but I didn't yet figure out how to do it...

Update: The result I get is just degreeOfParallelism times null (I guess because of default(TResult)) even though all the tasks get executed. I also tried to return await body(...) and then the result was fine, but only degreeOfParallelism number of tasks got executed.



Solution 1:[1]

Your LINQ query can only ever have the same number of results as the number of partitions - you're just projecting each partition into a single result.

If you don't care about the order, you just need to assemble the results of each partition into a list, then flatten them afterwards.

public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
    var lists = await Task.WhenAll<List<TResult>>(
        Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
            .Select(partition => Task.Run<List<TResult>>(async () =>
                    {
                        var list = new List<TResult>();
                        using (partition)
                        {
                            while (partition.MoveNext())
                            {
                                list.Add(await body(partition.Current));
                            }
                        }
                        return list;
                   })));
     return lists.SelectMany(list => list).ToArray();
}

(I've renamed this from ForEachAsync, as ForEach sounds imperative (suitable for the Func<T, Task> in the original) whereas this is fetching results. A foreach loop doesn't have a result - this does.)

Solution 2:[2]

Now that the Parallel.ForEachAsync API has become part of the standard libraries (.NET 6), it makes sense to implement a variant that returns a Task<TResult[]>, based on this API. Here is an implementation:

public static Task<TResult[]> ForEachAsync<TSource, TResult>(
    this IEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
    var results = new List<TResult>();
    if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
    var withIndexes = source.Select((item, index) => (item, index));
    return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
    {
        var (item, index) = entry;
        var result = await body(item, ct).ConfigureAwait(false);
        lock (results)
        {
            while (results.Count <= index) results.Add(default);
            results[index] = result;
        }
    }).ContinueWith(t =>
    {
        if (t.IsCanceled)
        {
            // Propagate the correct token
            CancellationToken ct = default;
            try { t.GetAwaiter().GetResult(); }
            catch (OperationCanceledException oce) { ct = oce.CancellationToken; }
            return Task.FromCanceled<TResult[]>(ct);
        }
        if (t.IsFaulted)
        {
            var tcs = new TaskCompletionSource<TResult[]>();
            tcs.SetException(t.Exception.InnerExceptions);
            return tcs.Task;
        }
        lock (results) return Task.FromResult(results.ToArray());
    }, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default)
        .Unwrap();
}

This implementation supports all the options and the functionality of the Parallel.ForEachAsync overload that has an IEnumerable<T> as source. Its behavior in case of errors and cancellation is identical. The results are arranged in the same order as the associated elements in the source sequence.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2