ForEachAsync 与结果

Dun*_*ken 1 c# concurrency asynchronous task-parallel-library parallel.foreachasync

我正在尝试将Stephen Toub 的 ForEachAsync<T>扩展方法更改为返回结果的扩展......

斯蒂芬的扩展:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}
Run Code Online (Sandbox Code Playgroud)

我的方法(不起作用;任务被执行但结果是错误的)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
    int degreeOfParallelism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); // When I "return await",
                        // I get good results but only one per partition 
            return default(TResult);
        }));
}
Run Code Online (Sandbox Code Playgroud)

我知道我必须以某种方式返回(WhenAll?)最后一部分的结果,但我还不知道该怎么做......

更新:即使所有任务都被执行,我得到的结果只是degreeOfParallelismtimes null (我猜是因为)。default(TResult)我也尝试过return await body(...),结果很好,但只degreeOfParallelism执行了一些任务。

The*_*ias 8

既然该API 已成为标准库 (.NET 6) 的一部分,那么基于此 APIParallel.ForEachAsync实现返回 , 的变体是有意义的。Task<TResult[]>以下是针对 .NET 8 的实现:

/// <summary>
/// Executes a foreach loop on an enumerable sequence, in which iterations may run
/// in parallel, and returns the results of all iterations in the original order.
/// </summary>
public static Task<TResult[]> ForEachAsync<TSource, TResult>(
    IEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(parallelOptions);
    ArgumentNullException.ThrowIfNull(body);
    List<TResult> results = new();
    if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
    IEnumerable<(TSource, int)> withIndexes = source.Select((x, i) => (x, i));
    return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
    {
        (TSource item, int index) = entry;
        TResult result = await body(item, ct).ConfigureAwait(false);
        lock (results)
        {
            if (index >= results.Count)
                CollectionsMarshal.SetCount(results, index + 1);
            results[index] = result;
        }
    }).ContinueWith(t =>
    {
        TaskCompletionSource<TResult[]> tcs = new();
        switch (t.Status)
        {
            case TaskStatus.RanToCompletion:
                lock (results) tcs.SetResult(results.ToArray()); break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions); break;
            case TaskStatus.Canceled:
                tcs.SetCanceled(new TaskCanceledException(t).CancellationToken); break;
            default: throw new UnreachableException();
        }
        Debug.Assert(tcs.Task.IsCompleted);
        return tcs.Task;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
Run Code Online (Sandbox Code Playgroud)

Parallel.ForEachAsync此实现支持具有 as 的重载IEnumerable<T>的所有选项和功能source。它在发生错误和取消时的行为是相同的。结果的排列顺序与序列中关联元素的顺序相同source

CollectionsMarshal.SetCount是 .NET 8 中引入的高级 API。它会更改 aCountList<T>,在增加时暴露未初始化的数据。对于在 .NET 6 上运行的不太现代(且性能稍差)的方法,请参阅此答案的第五修订版