.net Core Parallel.ForEach问题

nur*_*guy 6 c# parallel-processing

我已经切换到某些项目的.net Core,现在我遇到了Parallel.ForEach的问题.在过去,我经常有一个id值列表,然后我将用它来发送Web请求以获取完整数据.它看起来像这样:

Parallel.ForEach(myList, l =>
{
    // make web request using l.id 
    // process the data somehow
});
Run Code Online (Sandbox Code Playgroud)

好吧,在.net Core中,必须标记所有Web请求await,这意味着必须标记Parallel.ForEach操作async.但是,将Parallel.ForEach操作标记为async意味着我们有一个void async导致问题的方法.在我的特定情况下,这意味着响应在并行循环中的所有Web请求完成之前返回到应用程序,这既尴尬又导致错误.

问题:在这里使用Parallel.ForEach的替代方法是什么?

我发现一个可能的解决方案是将Parallel循环包装在Task中并等待任务:

await Task.Run(() => Parallel.ForEach(myList, l =>
{
    // stuff here
}));
Run Code Online (Sandbox Code Playgroud)

(在这里找到:Parallel.ForEach vs Task.Run和Task.WhenAll)

但是,这对我不起作用.当我使用它时,我仍然在循环完成之前返回应用程序.

另外一个选项:

var tasks = new List<Task>();
foreach (var l in myList)
{
    tasks.Add(Task.Run(async () =>
    {
         // stuff here
    }));
}
await Task.WhenAll(tasks);
Run Code Online (Sandbox Code Playgroud)

这似乎有效,但这是唯一的选择吗?似乎新的.net Core已经使Parallel.ForEach几乎无用(至少在嵌套Web调用时).

任何帮助/建议表示赞赏.

Evk*_*Evk 20

Parallel.ForEach注释中解释了为什么对此任务不利:它专为CPU绑定(CPU密集型)任务而设计.如果你将它用于IO绑定操作(比如发出web请求) - 你会在等待响应时浪费线程池线程,没有任何好处.它仍然可以使用它,但它不适合这种情况.

你需要的是使用异步Web请求方法(如HttpWebRequest.GetResponseAsync),但这里有另一个问题 - 你不想一次执行所有的web请求(如另一个答案所示).您的列表中可能有数千个网址(ID).因此,您可以使用为此设计的线程同步构造Semaphore.Semaphore就像队列一样 - 它允许X线程通过,其余的应该等到其中一个忙线完成它的工作(稍微简化一下).这是一个例子:

static async Task ProcessUrls(string[] urls) {
    var tasks = new List<Task>();
    // semaphore, allow to run 10 tasks in parallel
    using (var semaphore = new SemaphoreSlim(10)) {
        foreach (var url in urls) {
            // await here until there is a room for this task
            await semaphore.WaitAsync();
            tasks.Add(MakeRequest(semaphore, url));
        }
        // await for the rest of tasks to complete
        await Task.WhenAll(tasks);
    }
}

private static async Task MakeRequest(SemaphoreSlim semaphore, string url) {
    try {
        var request = (HttpWebRequest) WebRequest.Create(url);

        using (var response = await request.GetResponseAsync().ConfigureAwait(false)) {
            // do something with response    
        }
    }
    catch (Exception ex) {
        // do something
    }
    finally {
        // don't forget to release
        semaphore.Release();
    }
}
Run Code Online (Sandbox Code Playgroud)


Mat*_*ero 11

这3个apporaches都不好.

您不应该使用Parallel该类,或Task.Run在此方案中使用.

相反,有一个async处理程序方法:

private async Task HandleResponse(Task<HttpResponseMessage> gettingResponse)
{
     HttpResponseMessage response = await gettingResponse;
     // Process the data
}
Run Code Online (Sandbox Code Playgroud)

然后使用Task.WhenAll:

Task[] requests = myList.Select(l => SendWebRequest(l.Id))
                        .Select(r => HandleResponse(r))
                        .ToArray();

await Task.WhenAll(requests);
Run Code Online (Sandbox Code Playgroud)