在一些.NET Parallel.ForEach()代码中做一些异步/等待是否可以?

Pur*_*ome 7 .net c# parallel-processing async-await parallel.foreach

考虑下面的代码,是不是OKasync/await里面Parallel.ForEach

例如.

Parallel.ForEach(names, name =>
{
    // Do some stuff...

    var foo = await GetStuffFrom3rdPartyAsync(name);

    // Do some more stuff, with the foo.
});
Run Code Online (Sandbox Code Playgroud)

还是有一些我需要知道的问题?

编辑:不知道这是否编译,顺便说一句.只是Pseduo代码..大声思考.

Sri*_*vel 6

不,这是没有意义的结合asyncParalell.Foreach.

请考虑以下示例:

private void DoSomething()
{
    var names = Enumerable.Range(0,10).Select(x=> "Somename" + x);
    Parallel.ForEach(names, async(name) =>
    {   
        await Task.Delay(1000);
        Console.WriteLine("Name {0} completed",name);
    });
    Console.WriteLine("Parallel ForEach completed");
}
Run Code Online (Sandbox Code Playgroud)

你会期望什么输出?

Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...
Parallel ForEach completed
Run Code Online (Sandbox Code Playgroud)

那不会发生什么.它将输出:

Parallel ForEach completed
Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...
Run Code Online (Sandbox Code Playgroud)

为什么?因为当ForEach命中第await一个方法实际返回时,Parallel.ForEach不知道它是异步的并且它跑完了!代码await运行后作为另一个线程的延续而不是 "Paralell处理线程"

Stephen Toub在这里解决了这个问题


Ste*_*ary 5

从名字上看,我假设它GetStuffFrom3rdPartyAsync是 I/O 绑定的。的Parallel类是专门针对CPU结合的代码。

在异步世界中,您可以启动多个任务,然后(异步)使用Task.WhenAll. 由于您从一个序列开始,将每个元素投影到一个异步操作,然后等待所有这些操作可能是最简单的:

await Task.WhenAll(names.Select(async name =>
{
  // Do some stuff...
  var foo = await GetStuffFrom3rdPartyAsync(name);
  // Do some more stuff, with the foo.
}));
Run Code Online (Sandbox Code Playgroud)


Ned*_*nov 3

Parallel.ForEach正如 @Sriram Sakthivel 所指出的,使用异步 lambda存在一些问题。Steven Toub 也ForEachASync能做到同样的事情。他在这里谈论它,但这里是代码:

public static class Extensions
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate {
                                               using (partition) while (partition.MoveNext()) await body(partition.Current);
            }));
    }
}
Run Code Online (Sandbox Code Playgroud)

它使用该类Partitioner创建负载平衡分区器(doco),并允许您使用参数指定要运行的线程数dop。看看它和 之间的区别Parallel.ForEach。尝试以下代码。

 class Program
    {
        public static async Task GetStuffParallelForEach()
        {
            var data = Enumerable.Range(1, 10);
            Parallel.ForEach(data, async i =>
            {
                await Task.Delay(1000 * i);
                Console.WriteLine(i);
            });
        }

        public static async Task GetStuffForEachAsync()
        {
            var data = Enumerable.Range(1, 10);
            await data.ForEachAsync(5, async i =>
            {
                await Task.Delay(1000 * i);
                Console.WriteLine(i);
            });

        }

        static void Main(string[] args)
        {
            //GetStuffParallelForEach().Wait(); // Finished printed before work is complete
            GetStuffForEachAsync().Wait(); // Finished printed after all work is done
            Console.WriteLine("Finished");
            Console.ReadLine();
        }
Run Code Online (Sandbox Code Playgroud)

如果您运行GetStuffForEachAsync该程序,请等待所有工作完成。如果运行GetStuffParallelForEach,该行将Finished在工作完成之前打印。