包含异步和同步的并行循环

Mog*_*og0 0 c# parallel-processing plinq async-await parallel.foreach

我有一个需要并行运行的循环,因为每次迭代都很慢并且需要大量处理器,但我还需要调用异步方法作为循环中每次迭代的一部分。

我见过关于如何在循环中处理异步方法的问题,但没有看到关于异步和同步的组合的问题,这就是我所得到的。

我的(简化的)代码如下 - 我知道由于异步操作被传递给 foreach,这将无法正常工作。

protected IDictionary<int, ReportData> GetReportData()
{
    var results = new ConcurrentDictionary<int, ReportData>();
      
    Parallel.ForEach(requestData, async data =>
    {
        // process data synchronously
        var processedData = ProcessData(data);

        // get some data async
        var reportRequest = await BuildRequestAsync(processedData);

        // synchronous building
        var report = reportRequest.BuildReport();

        results.TryAdd(data.ReportId, report);
     });

     // This needs to be populated before returning
     return results;
}
Run Code Online (Sandbox Code Playgroud)

当操作必须异步才能等待单个异步调用时,有什么方法可以并行执行操作。

将同步函数转换为异步函数并不是一个实用的选择。

我不想将操作拆分并使用 Parallel.ForEach ,然后使用 WhenAll 和另一个 Parallel.ForEach 进行异步调用,因为每个阶段的速度在不同迭代之间可能差异很大,因此拆分操作效率较低,因为速度越快那些人会等待较慢的人然后再继续。

我确实想知道是否可以使用 PLINQ ForAll 来代替 Parallel.ForEach,但从未使用过 PLINQ,并且不确定它是否会在返回之前等待所有迭代完成,即任务是否仍在运行结束的过程。

Ste*_*ary 5

当操作必须异步才能等待单个异步调用时,有什么方法可以并行执行操作。

是的,但是您需要了解Parallel当您采取其他方法时,是什么给您带来了损失。具体来说,Parallel会自动确定合适的线程数并根据使用情况进行调整。

将同步函数转换为异步函数并不是一个实用的选择。

对于 CPU 密集型方法,您不应转换它们。

我不想将操作拆分并使用 Parallel.ForEach ,然后使用 WhenAll 和另一个 Parallel.ForEach 进行异步调用,因为每个阶段的速度在不同迭代之间可能差异很大,因此拆分操作效率较低,因为速度越快那些人会等待较慢的人然后再继续。

我提出的第一个建议是研究TPL Dataflow。它允许您定义某种“管道”,保持数据流动,同时限制每个阶段的并发性。

我确实想知道是否可以使用 PLINQ ForAll 代替 Parallel.ForEach

Parallel不会。PLINQ 的工作方式非常相似。它们在 CPU 利用率方面存在一些差异,并且存在一些 API 差异 - 例如,如果您最终有一组结果,PLINQ 通常更干净Parallel- 但从高级视图来看,它们非常相似。两者都只适用于同步代码。

但是,您可以使用简单的Task.Runwith Task.WhenAll

protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
  var tasks = requestData.Select(async data => Task.Run(() =>
  {
    // process data synchronously
    var processedData = ProcessData(data);

    // get some data async
    var reportRequest = await BuildRequestAsync(processedData);

    // synchronous building
    var report = reportRequest.BuildReport();

    return (Key: data.ReportId, Value: report);
  })).ToList();
  var results = await Task.WhenAll(tasks);
  return results.ToDictionary(x => x.Key, x => x.Value);
}
Run Code Online (Sandbox Code Playgroud)

您可能需要应用并发限制(这Parallel对您来说已经足够了)。在异步世界中,这看起来像:

protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
  var throttle = new SemaphoreSlim(10);
  var tasks = requestData.Select(data => Task.Run(async () =>
  {
    await throttle.WaitAsync();
    try
    {
      // process data synchronously
      var processedData = ProcessData(data);

      // get some data async
      var reportRequest = await BuildRequestAsync(processedData);

      // synchronous building
      var report = reportRequest.BuildReport();

      return (Key: data.ReportId, Value: report);
    }
    finally
    {
      throttle.Release();
    }
  })).ToList();
  var results = await Task.WhenAll(tasks);
  return results.ToDictionary(x => x.Key, x => x.Value);
}
Run Code Online (Sandbox Code Playgroud)

  • 我想你可能犯了一个小错误。异步应该位于 Task.Run 内的 lambda 上,而不是位于 Select 内的 lambda 上。等待全部在 Task.Run() 的 lambda 上 (2认同)