如何限制多个异步任务?

Sha*_*ica 3 c# asynchronous task-parallel-library async-await

我有一些以下形式的代码:

static async Task DoSomething(int n) 
{
  ...
}

static void RunThreads(int totalThreads, int throttle) 
{
  var tasks = new List<Task>();
  for (var n = 0; n < totalThreads; n++)
  {
    var task = DoSomething(n);
    tasks.Add(task);
  }
  Task.WhenAll(tasks).Wait(); // all threads must complete
}
Run Code Online (Sandbox Code Playgroud)

麻烦的是,如果我没有限制线程,事情就会开始崩溃.现在,我想启动最多的throttle线程,并且仅在旧线程完成时才启动新线程.我尝试过几种方法,迄今为止没有一种方法可行.我遇到的问题包括:

  • tasks集合必须与所有的任务被完全填充,无论是主动还是等待执行,否则最后的.Wait()通话只着眼于它开始与线程.
  • 链接执行似乎需要使用Task.Run()等.但是我需要从一开始就引用每个任务,并且实例化任务似乎会自动启动它,这是我不想要的.

这该怎么做?

Ste*_*ary 18

首先,从线程中抽象出来。特别是因为您的操作是异步的,您根本不应该考虑“线程”。在异步世界中,您有任务,并且与线程相比,您可以拥有大量的任务。

可以使用SemaphoreSlim以下方法限制异步代码:

static async Task DoSomething(int n);

static void RunConcurrently(int total, int throttle) 
{
  var mutex = new SemaphoreSlim(throttle);
  var tasks = Enumerable.Range(0, total).Select(async item =>
  {
    await mutex.WaitAsync();
    try { await DoSomething(item); }
    finally { mutex.Release(); }
  });
  Task.WhenAll(tasks).Wait();
}
Run Code Online (Sandbox Code Playgroud)


i3a*_*non 10

最简单的选择IMO是使用TPL Dataflow.您只需创建一个ActionBLock,按所需的并行度限制它并开始将项目发布到其中.它确保只同时运行一定数量的任务,当任务完成时,它开始执行下一个项目:

async Task RunAsync(int totalThreads, int throttle) 
{
    var block = new ActionBlock<int>(
        DoSomething,
        new ExecutionDataFlowOptions { MaxDegreeOfParallelism = throttle });

    for (var n = 0; n < totalThreads; n++)
    {
        block.Post(n);
    }

    block.Complete();
    await block.Completion;
}
Run Code Online (Sandbox Code Playgroud)


Sri*_*vel 7

如果我理解正确,您可以启动throttle参数提到的任务数量有限的任务,并等待它们在下一个开始之前完成.

要在开始新任务之前等待所有已启动的任务完成,请使用以下实现.

static async Task RunThreads(int totalThreads, int throttle)
{
    var tasks = new List<Task>();
    for (var n = 0; n < totalThreads; n++)
    {
        var task = DoSomething(n);
        tasks.Add(task);

        if (tasks.Count == throttle)
        {
            await Task.WhenAll(tasks);
            tasks.Clear();
        }
    }
    await Task.WhenAll(tasks); // wait for remaining
}
Run Code Online (Sandbox Code Playgroud)

要在完成任务时添加任务,可以使用以下代码

static async Task RunThreads(int totalThreads, int throttle)
{
    var tasks = new List<Task>();
    for (var n = 0; n < totalThreads; n++)
    {
        var task = DoSomething(n);
        tasks.Add(task);

        if (tasks.Count == throttle)
        {
            var completed = await Task.WhenAny(tasks);
            tasks.Remove(completed);
        }
    }
    await Task.WhenAll(tasks); // all threads must complete
}
Run Code Online (Sandbox Code Playgroud)


Rob*_*ner 6

Stephen Toub在他的基于任务的异步模式文档中给出了以下关于限制的示例.

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}
Run Code Online (Sandbox Code Playgroud)


Rob*_*ner 6

.NET 6 引入了Parallel.ForEachAsync. 你可以像这样重写你的代码:

static async ValueTask DoSomething(int n)
{
    ...
}

static Task RunThreads(int totalThreads, int throttle)
    => Parallel.ForEachAsync(Enumerable.Range(0, totalThreads), new ParallelOptions() { MaxDegreeOfParallelism = throttle }, (i, _) => DoSomething(i));
Run Code Online (Sandbox Code Playgroud)

笔记:

  • 我必须将DoSomething函数的返回类型从更改TaskValueTask
  • 您可能想避免.Wait()接到电话,所以我拨打了RunThreads方法设为异步。
  • 从您的示例来看,为什么您需要访问各个任务并不明显。此代码不允许您访问这些任务,但在许多情况下仍然可能有帮助。


Eni*_*ity 5

微软的Reactive Extensions(Rx)-NuGet“ Rx-Main”-很好地解决了此问题。

只是这样做:

static void RunThreads(int totalThreads, int throttle) 
{
    Observable
        .Range(0, totalThreads)
        .Select(n => Observable.FromAsync(() => DoSomething(n)))
        .Merge(throttle)
        .Wait();
}
Run Code Online (Sandbox Code Playgroud)

任务完成。