并行运行异步方法8次

Ste*_*ütt 7 .net c# parallel-processing .net-4.5

如何将以下内容转换为Parallel.ForEach?

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    foreach (String url in threads)
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }
        progressBar1.PerformStep();
    }
}
Run Code Online (Sandbox Code Playgroud)

我把它编程为假设异步和并行处理是相同的,我只是意识到它不是.我看了一下我能找到的所有问题,而且我真的无法找到一个为我做的例子.他们中的大多数缺乏可读的变量名称 使用单字母变量名称不能解释它们包含的内容是一种说明示例的可怕方式.

我通常在名为threads的数组中包含300到2000个条目(包含论坛线程的URL),看起来并行处理(由于许多HTTP请求)会加快执行速度.

在使用Parallel.ForEach之前,是否必须删除所有异步(我在foreach之外没有任何异步,只有变量定义)?我应该怎么做呢?我可以不阻塞主线程吗?

我顺便使用.NET 4.5.

Ste*_*ary 7

Stephen Toub有一篇关于实施aForEachAsync好博文.对于Dataflow可用的平台,Svick的答案非常好.

这是另一种选择,使用TPL中的分区程序:

public static Task ForEachAsync<T>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task> body)
{
  var partitions = Partitioner.Create(source).GetPartitions(degreeOfParallelism);
  var tasks = partitions.Select(async partition =>
  {
    using (partition) 
      while (partition.MoveNext()) 
        await body(partition.Current); 
  });
  return Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)

然后你可以这样使用:

public async Task getThreadContentsAsync(String[] threads)
{
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await threads.ForEachAsync(8, async url =>
  {
    HttpResponseMessage response = await client.GetAsync(url);
    String content = await response.Content.ReadAsStringAsync();
    String user;
    foreach (Match match in regex.Matches(content))
    {
      user = match.Groups[1].ToString();
      usernames.TryAdd(user, null);
    }
    progressBar1.PerformStep();
  });
}
Run Code Online (Sandbox Code Playgroud)


svi*_*ick 6

我在假设异步和并行处理相同的情况下对其进行编码

异步处理和并行处理是完全不同的.如果您不理解这种差异,我认为您应该首先阅读更多相关信息(例如,c#中的异步和并行编程之间的关系是什么?).

现在,你想要做的事情实际上并不那么简单,因为你想要异步处理一个大集合,具有特定的并行度(8).使用同步处理,您可以使用Parallel.ForEach()(以及ParallelOptions配置并行度),但没有简单的替代方法可以使用async.

在您的代码中,由于您希望所有内容都在UI线程上执行,因此这很复杂.(尽管理想情况下,您不应该直接从计算中访问UI.相反,您应该使用IProgress,这意味着代码不再需要在UI线程上执行.)

在.Net 4.5中执行此操作的最佳方法可能是使用TPL Dataflow.它ActionBlock完全符合您的要求,但它可能非常冗长(因为它比您需要的更灵活).因此,创建一个辅助方法是有意义的:

public static Task AsyncParallelForEach<T>(
    IEnumerable<T> source, Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };
    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();
    return block.Completion;
}
Run Code Online (Sandbox Code Playgroud)

在你的情况下,你会像这样使用它:

await AsyncParallelForEach(
    threads, async url => await DownloadUrl(url), 8,
    TaskScheduler.FromCurrentSynchronizationContext());
Run Code Online (Sandbox Code Playgroud)

这里,DownloadUrl()是一个async Task处理单个URL(循环体)的方法,8是并行度(可能不应该是实际代码中的文字常量),FromCurrentSynchronizationContext()并确保代码在UI线程上执行.