IO绑定操作的并行执行

ozg*_*gur 14 c# task task-parallel-library

我已阅读TPL和任务库文档封面.但是,我仍然无法清楚地理解以下案例,现在我需要实施它.

我会简化我的情况.我IEnumerable<Uri>的长度是1000.我必须使用它来请求它们HttpClient.

我有两个问题.

  1. 没有太多的计算,只是在等待HTTP请求.在这种情况下我还可以使用Parallel.Foreach()吗?
  2. 在使用的情况下,Task创建大量的最佳实践是什么?假设我使用Task.Factory.StartNew()并将这些任务添加到列表中并等待所有这些任务.是否有一个功能(如TPL分区程序)控制最大任务的数量和HttpClient我可以创建的最大值?

在SO上有几个类似的问题,但没有人提到最大值.该要求仅使用具有最大HttpClient的最大任务.

先感谢您.

Ste*_*ary 25

i3arnon对TPL Dataflow的回答很好; 如果您混合使用CPU和I/O绑定代码,则数据流非常有用.我会回应他Parallel为CPU绑定代码设计的情绪; 它不是基于I/O的代码的最佳解决方案,尤其不适合异步代码.

如果您想要一个适用于大多数I/O代码的替代解决方案 - 并且不需要外部库 - 您正在寻找的方法是Task.WhenAll:

var tasks = uris.Select(uri => SendRequestAsync(uri)).ToArray();
await Task.WhenAll(tasks);
Run Code Online (Sandbox Code Playgroud)

这是最简单的解决方案,但它确实具有同时启动所有请求的缺点.特别是如果所有请求都转到相同的服务(或一小组服务),这可能会导致超时.要解决这个问题,你需要使用某种限制......

是否有一个功能(如TPL分区程序)控制最大任务的数量和我可以创建的最大HttpClient?

TPL Dataflow具有很好的功能MaxDegreeOfParallelism,一次只启动这么多.您还可以使用另一个内置来限制常规异步代码SemaphoreSlim:

private readonly SemaphoreSlim _sem = new SemaphoreSlim(50);
private async Task SendRequestAsync(Uri uri)
{
  await _sem.WaitAsync();
  try
  {
    ...
  }
  finally
  {
    _sem.Release();
  }
}
Run Code Online (Sandbox Code Playgroud)

如果使用Task,创建大量的最佳实践是什么?假设我使用Task.Factory.StartNew()并将这些任务添加到列表中并等待所有这些任务.

你实际上不想使用StartNew.它只有一个适当的用例(基于动态任务的并行性),这是非常罕见的.Task.Run如果您需要将工作推送到后台线程,则应使用现代代码.但是你甚至不需要开头,所以这里既不适合StartNew也不Task.Run适合.

在SO上有几个类似的问题,但没有人提到最大值.该要求仅使用具有最大HttpClient的最大任务.

最大值是异步代码真正变得棘手的地方.使用CPU绑定(并行)代码,解决方案显而易见:您使用尽可能多的线程.(好吧,至少你可以那里开始并根据需要进行调整).使用异步代码,没有明显的解决方案.这取决于很多因素 - 你有多少内存,远程服务器如何响应(速率限制,超时等)等.

这里没有简单的解决方案.您只需要测试您的特定应用程序如何处理高级别的并发性,然后限制到较低的数字.


我有一些幻灯片可以解释何时适合不同的技术(并行,异步,TPL数据流和Rx).如果您更喜欢书面描述和食谱,我想您可能会从我的并发书中受益.

  • 当你说没有简单的解决方案时,它就结束了我的痛苦.我当时认为可能有办法做到这一点而且日夜都在寻找.现在我可以尝试为我自己的情况实现一些特定的东西.非常感谢你. (3认同)

i3a*_*non 20

在这种情况下我还可以使用Parallel.Foreach吗?

这不太合适.Parallel.Foreach更多的是CPU密集型工作.它也不支持异步操作.

在使用的情况下,Task创建大量的最佳实践是什么?

请改用TPL Dataflow块.您不会创建大量等待线程可用的任务.您可以配置最大任务量,并将其重用于同时位于缓冲区中等待任务的所有项目.例如:

var block = new ActionBlock<Uri>(
    uri => SendRequestAsync(uri),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });

foreach (var uri in uris)
{
    block.Post(uri);
}

block.Complete();
await block.Completion;
Run Code Online (Sandbox Code Playgroud)