Dun*_*ken 1 c# concurrency asynchronous task-parallel-library parallel.foreachasync
我正在尝试将Stephen Toub 的 ForEachAsync<T>扩展方法更改为返回结果的扩展......
斯蒂芬的扩展:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
Run Code Online (Sandbox Code Playgroud)
我的方法(不起作用;任务被执行但结果是错误的)
public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
int degreeOfParallelism, Func<T, Task<TResult>> body)
{
return Task.WhenAll<TResult>(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run<TResult>(async () =
{
using (partition)
while (partition.MoveNext())
await body(partition.Current); // When I "return await",
// I get good results but only one per partition
return default(TResult);
}));
}
Run Code Online (Sandbox Code Playgroud)
我知道我必须以某种方式返回(WhenAll?)最后一部分的结果,但我还不知道该怎么做......
更新:即使所有任务都被执行,我得到的结果只是degreeOfParallelismtimes null (我猜是因为)。default(TResult)我也尝试过return await body(...),结果很好,但只degreeOfParallelism执行了一些任务。
既然该API 已成为标准库 (.NET 6) 的一部分,那么基于此 APIParallel.ForEachAsync实现返回 , 的变体是有意义的。Task<TResult[]>以下是针对 .NET 8 的实现:
/// <summary>
/// Executes a foreach loop on an enumerable sequence, in which iterations may run
/// in parallel, and returns the results of all iterations in the original order.
/// </summary>
public static Task<TResult[]> ForEachAsync<TSource, TResult>(
IEnumerable<TSource> source,
ParallelOptions parallelOptions,
Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(parallelOptions);
ArgumentNullException.ThrowIfNull(body);
List<TResult> results = new();
if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
IEnumerable<(TSource, int)> withIndexes = source.Select((x, i) => (x, i));
return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
{
(TSource item, int index) = entry;
TResult result = await body(item, ct).ConfigureAwait(false);
lock (results)
{
if (index >= results.Count)
CollectionsMarshal.SetCount(results, index + 1);
results[index] = result;
}
}).ContinueWith(t =>
{
TaskCompletionSource<TResult[]> tcs = new();
switch (t.Status)
{
case TaskStatus.RanToCompletion:
lock (results) tcs.SetResult(results.ToArray()); break;
case TaskStatus.Faulted:
tcs.SetException(t.Exception.InnerExceptions); break;
case TaskStatus.Canceled:
tcs.SetCanceled(new TaskCanceledException(t).CancellationToken); break;
default: throw new UnreachableException();
}
Debug.Assert(tcs.Task.IsCompleted);
return tcs.Task;
}, default, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
Run Code Online (Sandbox Code Playgroud)
Parallel.ForEachAsync此实现支持具有 as 的重载IEnumerable<T>的所有选项和功能source。它在发生错误和取消时的行为是相同的。结果的排列顺序与序列中关联元素的顺序相同source。
这CollectionsMarshal.SetCount是 .NET 8 中引入的高级 API。它会更改 aCount的List<T>,在增加时暴露未初始化的数据。对于在 .NET 6 上运行的不太现代(且性能稍差)的方法,请参阅此答案的第五修订版。