与异步lambda并行的foreach

cla*_*ndk 100 c# task-parallel-library async-await parallel.foreach

我想并行处理一个集合,但是我在实现它时遇到了麻烦,因此我希望得到一些帮助.

如果我想在并行循环的lambda中调用C#中标记为async的方法,则会出现问题.例如:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;
Run Code Online (Sandbox Code Playgroud)

计数为0时会出现问题,因为创建的所有线程实际上只是后台线程,并且Parallel.ForEach调用不等待完成.如果我删除async关键字,该方法如下所示:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;
Run Code Online (Sandbox Code Playgroud)

它工作,但它完全禁用等待聪明,我必须做一些手动异常处理..(为简洁起见删除).

如何实现一个Parallel.ForEach在lambda中使用await关键字的循环?可能吗?

Parallel.ForEach方法的原型采用Action<T>as参数,但我希望它等待我的异步lambda.

Ste*_*ary 150

如果您只想要简单的并行性,可以这样做:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
Run Code Online (Sandbox Code Playgroud)

如果你需要更复杂的东西,请查看Stephen Toub的ForEachAsync帖子.

  • 可能需要一种限制机制.这将立即创建尽可能多的任务,这些项目最终可能会出现在10k网络请求等中. (37认同)
  • @usr Stephen Toub的文章中的最后一个例子说明了这一点. (7认同)
  • 此方法的一个简单限制机制是将列表拆分为包含 N 个条目的小列表,并为每个较小的批次执行此任务 select + Task.WhenAll。这样,您就不会为大型数据集生成数千个任务。 (4认同)
  • @LukePuplett它创建`dop`任务,然后每个任务依次处理输入集合的某些子集。 (2认同)
  • @Afshin_Zavvar:如果在不等待结果的情况下调用Task.Run,​​那只是在线程池上丢下了即弃的工作。那几乎总是一个错误。 (2认同)

Lib*_*tad 144

Parallel.ForEachAsync是新的 .NET 6 API 之一,它是一种安排异步工作的方法,允许您控制并行度:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});
Run Code Online (Sandbox Code Playgroud)

Scott Hanselman 博客中的另一个例子。

来源,供参考。


Ser*_*nov 44

您可以使用AsyncEnumerator NuGet Package中ParallelForEachAsync扩展方法:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
Run Code Online (Sandbox Code Playgroud)

  • @ppumkin,是的,这是我的.我一遍又一遍地看到这个问题,所以决定尽可能以最简单的方式解决它,让其他人免于挣扎:) (13认同)
  • 这是你的包裹?我现在在几个地方看到你发这个了?:D 哦等等..你的名字在包裹上:D +1 (2认同)
  • 你有一个错字:`maxDegreeOfParallelism`>`maxDegreeOfParalellism` (2认同)
  • 正确的拼写确实是maxDegreeOfParallelism,但是在@ ShiranDror的注释中有一些东西 - 在你的包中你错误地调用了变量maxDegreeOfParalellism(因此你引用的代码在你改变之前不会编译..) (2认同)

小智 27

SemaphoreSlim你可以实现并行控制。

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  try
  {
     await throttler.WaitAsync();
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;
Run Code Online (Sandbox Code Playgroud)

  • 另外,这一行“awaitthrottler.WaitAsync();” 不应该在 try 块内,因为如果它抛出异常,您将在尚未获取锁时调用 Release 。 (6认同)
  • SemaphoreSlim 应该用 ```using``` 语句包裹起来,因为它实现了 IDisposable (2认同)

Ale*_*lex 17

从其他答案和接受的 asnwer 引用的文章编译的最简单的扩展方法:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)

更新:这是一个简单的修改,它还支持评论中请求的取消令牌(未经测试)

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)

  • @TheodorZoulias 都是好点,已编辑。另外,我们还致力于支持“cancellationToken”的变体,稍后将发布。 (2认同)

nic*_*nko 5

我的 ParallelForEach 异步的轻量级实现。

特征:

  1. 节流(最大并行度)。
  2. 异常处理(完成时会抛出聚合异常)。
  3. 内存高效(无需存储任务列表)。

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

使用示例:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
Run Code Online (Sandbox Code Playgroud)


归档时间:

查看次数:

68112 次

最近记录:

6 年,8 月 前