Mog*_*og0 0 c# parallel-processing plinq async-await parallel.foreach
我有一个需要并行运行的循环,因为每次迭代都很慢并且需要大量处理器,但我还需要调用异步方法作为循环中每次迭代的一部分。
我见过关于如何在循环中处理异步方法的问题,但没有看到关于异步和同步的组合的问题,这就是我所得到的。
我的(简化的)代码如下 - 我知道由于异步操作被传递给 foreach,这将无法正常工作。
protected IDictionary<int, ReportData> GetReportData()
{
var results = new ConcurrentDictionary<int, ReportData>();
Parallel.ForEach(requestData, async data =>
{
// process data synchronously
var processedData = ProcessData(data);
// get some data async
var reportRequest = await BuildRequestAsync(processedData);
// synchronous building
var report = reportRequest.BuildReport();
results.TryAdd(data.ReportId, report);
});
// This needs to be populated before returning
return results;
}
Run Code Online (Sandbox Code Playgroud)
当操作必须异步才能等待单个异步调用时,有什么方法可以并行执行操作。
将同步函数转换为异步函数并不是一个实用的选择。
我不想将操作拆分并使用 Parallel.ForEach ,然后使用 WhenAll 和另一个 Parallel.ForEach 进行异步调用,因为每个阶段的速度在不同迭代之间可能差异很大,因此拆分操作效率较低,因为速度越快那些人会等待较慢的人然后再继续。
我确实想知道是否可以使用 PLINQ ForAll 来代替 Parallel.ForEach,但从未使用过 PLINQ,并且不确定它是否会在返回之前等待所有迭代完成,即任务是否仍在运行结束的过程。
当操作必须异步才能等待单个异步调用时,有什么方法可以并行执行操作。
是的,但是您需要了解Parallel当您采取其他方法时,是什么给您带来了损失。具体来说,Parallel会自动确定合适的线程数并根据使用情况进行调整。
将同步函数转换为异步函数并不是一个实用的选择。
对于 CPU 密集型方法,您不应转换它们。
我不想将操作拆分并使用 Parallel.ForEach ,然后使用 WhenAll 和另一个 Parallel.ForEach 进行异步调用,因为每个阶段的速度在不同迭代之间可能差异很大,因此拆分操作效率较低,因为速度越快那些人会等待较慢的人然后再继续。
我提出的第一个建议是研究TPL Dataflow。它允许您定义某种“管道”,保持数据流动,同时限制每个阶段的并发性。
我确实想知道是否可以使用 PLINQ ForAll 代替 Parallel.ForEach
Parallel不会。PLINQ 的工作方式非常相似。它们在 CPU 利用率方面存在一些差异,并且存在一些 API 差异 - 例如,如果您最终有一组结果,PLINQ 通常更干净Parallel- 但从高级视图来看,它们非常相似。两者都只适用于同步代码。
但是,您可以使用简单的Task.Runwith Task.WhenAll:
protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
var tasks = requestData.Select(async data => Task.Run(() =>
{
// process data synchronously
var processedData = ProcessData(data);
// get some data async
var reportRequest = await BuildRequestAsync(processedData);
// synchronous building
var report = reportRequest.BuildReport();
return (Key: data.ReportId, Value: report);
})).ToList();
var results = await Task.WhenAll(tasks);
return results.ToDictionary(x => x.Key, x => x.Value);
}
Run Code Online (Sandbox Code Playgroud)
您可能需要应用并发限制(这Parallel对您来说已经足够了)。在异步世界中,这看起来像:
protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
var throttle = new SemaphoreSlim(10);
var tasks = requestData.Select(data => Task.Run(async () =>
{
await throttle.WaitAsync();
try
{
// process data synchronously
var processedData = ProcessData(data);
// get some data async
var reportRequest = await BuildRequestAsync(processedData);
// synchronous building
var report = reportRequest.BuildReport();
return (Key: data.ReportId, Value: report);
}
finally
{
throttle.Release();
}
})).ToList();
var results = await Task.WhenAll(tasks);
return results.ToDictionary(x => x.Key, x => x.Value);
}
Run Code Online (Sandbox Code Playgroud)