在Parallel.ForEach中嵌套等待

Dar*_*g8r 159 c# wcf task-parallel-library async-await parallel.foreach

在metro应用程序中,我需要执行许多WCF调用.有大量的调用,所以我需要在并行循环中进行调用.问题是并行循环在WCF调用完成之前退出.

你会如何重构这个按预期工作?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)

svi*_*ick 150

背后的整个想法Parallel.ForEach()是你有一组线程,每个线程处理集合的一部分.正如您所注意到的,这不适用于async- await,您希望在异步调用期间释放线程.

你可以通过阻止ForEach()线程来"修复"它,但这会破坏整个点async- await.

你可以做的是使用TPL Dataflow代替Parallel.ForEach(),它支持异步Tasks.

具体来说,您的代码可以使用TransformBlock一个Customer使用asynclambda 将每个id转换为a 来编写.该块可以配置为并行执行.您可以将该块链接到ActionBlock将每个块写入Customer控制台的块.设置块网络后,您可以将Post()每个ID设置为TransformBlock.

在代码中:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)

虽然你可能想要将并行性限制TransformBlock为一些小的常量.此外,您可以限制其容量,TransformBlock并使用异步方式将项添加到其中SendAsync(),例如,如果集合太大.

与代码(如果有效)相比,一个额外的好处是,只要单个项目完成,写入就会立即开始,而不是等到所有处理完成.

  • @JasonLind确实如此.并行使用`Parallel.ForEach()`到`Post()`项应该没有任何实际效果. (4认同)
  • 非同步,反应式扩展,TPL和TPL DataFlow的简要概述 - http://vantsuyoshi.wordpress.com/2012/01/05/when-to-use-tpl-async-reactive-extension-tpl-dataflow/像我这样可能需要一些清晰度的人. (2认同)
  • 我的意思是在ActionBlock上指定MaxDegreeOfParallelism,就像在示例中对TransformBlock一样 (2认同)

Ste*_*ary 117

svick的答案(像往常一样)很棒.

但是,当您实际需要传输大量数据时,我发现Dataflow更有用.或者当您需要async兼容的队列时.

在您的情况下,更简单的解决方案是使用async-style并行性:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)

  • 如果你想手动限制并行性(在这种情况下你很可能会这样做),这样做会更复杂. (13认同)
  • @batmaci:`Parallel.ForEach`不支持`async`. (3认同)
  • @MikeT:这不会按预期工作。PLINQ 不理解异步任务,因此代码将仅并行化“async” lambda 的“起始”部分。 (3认同)
  • 但是你是对的,Dataflow 可能非常复杂(例如与 `Parallel.ForEach()` 相比)。但我认为这是目前对集合进行几乎所有“异步”工作的最佳选择。 (2认同)
  • @JamesManning `ParallelOptions` 有什么帮助?它仅适用于“Parallel.For/ForEach/Invoke”,作为建立的OP,它在这里没有用。 (2认同)
  • @Mike:“Parallel”(和“Task&lt;T&gt;”)是在“async”/“await”之前几年编写的,作为任务并行库(TPL)的一部分。当 `async`/`await` 出现时,他们可以选择创建自己的 `Future&lt;T&gt;` 类型以与 `async` 一起使用,或者重新使用 TPL 中现有的 `Task&lt;T&gt;` 类型。这两个决定显然都不正确,因此他们决定重新使用“Task&lt;T&gt;”。 (2认同)

Oha*_*der 74

使用DataFlow作为svick建议可能有点过分,而Stephen的回答并没有提供控制操作并发性的方法.但是,这可以简单地实现:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}
Run Code Online (Sandbox Code Playgroud)

ToArray()呼叫可以通过使用数组,而不是一个列表,并更换完成的任务进行优化,但我怀疑这会挣很多在大多数情况下的差别.OP问题的样本使用情况:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});
Run Code Online (Sandbox Code Playgroud)

编辑研究员SO用户和TPL Wiz Eli Arbel向我指出了Stephen Toub相关文章.像往常一样,他的实现既优雅又高效:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });

        })); 
}
Run Code Online (Sandbox Code Playgroud)

  • @Terry它会冒泡到调用者,因为最顶层的任务(由`Task.WhenAll`创建)将包含异常(在`AggregateException`中),并且如果所述调用者使用`await`,异常将在调用站点中抛出.但是,`Task.WhenAll`仍将等待*all*任务完成,并且`getPartitions`将在调用`partition.MoveNext`时动态分配元素,直到不再需要处理任何元素为止.这意味着除非您添加自己的机制来停止处理(例如`CancellationToken`),否则它本身不会发生. (3认同)
  • @MichaelFreidgeim 你可以在`await body` 之前做一些类似`var current = partition.Current` 的事情,然后在continuation 中使用`current` (`ContinueWith(t =&gt; { ... }`)。 (2认同)
  • Stephen Toub 文章的更新链接:https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync-part-2 (2认同)

Ser*_*nov 34

您可以使用新的AsyncEnumerator NuGet包节省工作量,该在4年前最初发布问题时不存在.它允许您控制并行度:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);
Run Code Online (Sandbox Code Playgroud)

免责声明:我是AsyncEnumerator库的作者,该库是开源的并且在MIT下获得许可,我发布此消息只是为了帮助社区.

  • 谢尔盖,你应该透露你是图书馆的作者 (10认同)
  • 好的,添加了免责声明.我不是通过广告来寻求任何好处,只是想帮助别人;) (5认同)
  • @CornielNobel,它与.NET Core兼容--GitHub上的源代码具有.NET Framework和.NET Core的测试范围. (2认同)
  • @SergeSemenov 我经常使用你的库的“AsyncStreams”,我不得不说它非常棒。强烈推荐这个库。 (2认同)

小智 14

Parallel.Foreacha换成a Task.Run()而不是await关键字use[yourasyncmethod].Result

(你需要做Task.Run的事情来阻止UI线程)

像这样的东西:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;
Run Code Online (Sandbox Code Playgroud)

  • 这有什么问题?我完全是这样做的.让`Parallel.ForEach`执行并行工作,直到完成所有工作,然后将整个事务推送到后台线程以获得响应式UI.有什么问题吗?也许这是一个睡眠过多的线程,但它是简短易读的代码. (3认同)

Joh*_*zen 7

这应该非常高效,并且比使整个TPL Dataflow工作更容易:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}
Run Code Online (Sandbox Code Playgroud)


Jay*_*hah 7

一种使用 SemaphoreSlim 并允许设置最大并行度的扩展方法

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }
Run Code Online (Sandbox Code Playgroud)

示例用法:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Run Code Online (Sandbox Code Playgroud)


Teo*_*ahi 6

我参加聚会有点晚了,但您可能想考虑使用 GetAwaiter.GetResult() 在同步上下文中运行您的异步代码,但如下所示;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});
Run Code Online (Sandbox Code Playgroud)


Vit*_*kov 5

在介绍了一堆辅助方法之后,您将能够使用以下简单的语法运行并行查询:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);
Run Code Online (Sandbox Code Playgroud)

这里发生的事情是:我们将源集合分成 10 个块 ( .Split(DegreeOfParallelism)),然后运行 ​​10 个任务,每个任务一个一个地处理其项目 ( .SelectManyAsync(...)) 并将它们合并回一个列表。

值得一提的是,有一种更简单的方法:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);
Run Code Online (Sandbox Code Playgroud)

但它需要一个预防措施:如果您的源集合太大,它会立即Task为每个项目安排一个,这可能会导致显着的性能下降。

上述示例中使用的扩展方法如下所示:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
Run Code Online (Sandbox Code Playgroud)