Parallel.ForEach 与异步 lambda 等待所有迭代完成

ldr*_*vic 2 c# task-parallel-library parallel.foreach

最近我看到了几个与 Parallel.ForEach 相关的 SO 线程与异步 lambdas 混合,但所有建议的答案都是某种解决方法。

有什么办法可以写:

List<int> list = new List<int>[]();

Parallel.ForEach(arrayValues, async (item) =>
{
  var x = await LongRunningIoOperationAsync(item);
  list.Add(x);
});
Run Code Online (Sandbox Code Playgroud)

如何确保列表将包含在每次迭代中使用 lambda 执行的所有迭代中的所有项目?

Parallel.ForEach 通常如何与异步 lambdas 一起工作,如果它命中等待,它会将其线程移交给下一次迭代吗?

我认为 ParallelLoopResult IsCompleted 字段不正确,因为它会在执行所有迭代时返回 true,无论它们的实际 lambda 作业是否完成?

Ste*_*ary 7

最近我看到了几个与 Parallel.ForEach 相关的 SO 线程与异步 lambdas 混合,但所有建议的答案都是某种解决方法。

嗯,那是因为Parallel 不适用于async. 从不同的角度来看,您首先为什么要混合它们?他们做相反的事情Parallel都是关于添加线程和async所有关于放弃线程。如果您想同时进行异步工作,请使用Task.WhenAll. 这是完成这项工作的正确工具;Parallel不是。

也就是说,听起来您想使用错误的工具,所以这是您的操作方法...

如何确保列表将包含在每次迭代中使用 lambda 执行的所有迭代中的所有项目?

您需要有某种信号,某些代码可以在处理完成之前阻止该信号,例如,CountdownEventMonitor。附带说明一下,您还需要保护对非线程安全的访问List<T>

Parallel.ForEach 通常如何与异步 lambdas 一起工作,如果它命中等待,它会将其线程移交给下一次迭代吗?

由于Parallel不理解asynclambda,当第一个await产生(返回)给它的调用者时,Parallel将假定循环的交互已经完成。

我认为 ParallelLoopResult IsCompleted 字段不正确,因为它会在执行所有迭代时返回 true,无论它们的实际 lambda 作业是否完成?

正确的。据Parallel了解,它只能“看到”await返回给调用者的第一个方法。所以它不知道asynclambda何时完成。它还会假设迭代过早完成,这会导致分区中断。


AAA*_*ddd 5

您不需要Parallel.For/ForEach在这里,您只需要等待任务列表即可。

背景

简而言之,您需要非常小心async lambdas,如果您将它们传递给ActionorFunc<Task>

您的问题是因为Parallel.For / ForEach不适合异步和等待模式IO 绑定任务。它们适合CPU 密集型工作负载。这意味着它们本质上有Action参数,让任务调度程序为您创建任务

如果您想同时运行多个异步Task.WhenAll任务,请使用 或TPL 数据 流块(或类似的东西),它可以有效地处理CPU 密集型IO 密集型工作负载,或者更直接地说,它们可以处理以下任务:什么是异步方法

除非您需要在 lambda 内部执行更多操作(您尚未显示),否则只需使用SelectandWhenAll

var tasks = items .Select(LongRunningIoOperationAsync);
var results = await Task.WhenAll(tasks); // here is your list of int
Run Code Online (Sandbox Code Playgroud)

如果这样做,您仍然可以使用等待,

var tasks = items.Select(async (item) =>
   {
       var x = await LongRunningIoOperationAsync(item);
       // do other stuff
       return x;
   });

var results = await Task.WhenAll(tasks);
Run Code Online (Sandbox Code Playgroud)

注意:如果您需要扩展功能Parallel.ForEach(即控制最大并发的选项),有多种方法,但是 RX 或 DataFlow 可能是最简洁的

  • @ldragicevic 默认任务调度程序将根据工作负载、核心和启发式方法注意限制并发性 (2认同)