如何限制c#中的最大并行任务数

Mat*_*gan 28 .net c# asynchronous

我有一个1000输入消息的集合来处理.我正在循环输入集合并为每个消息启动新任务以进行处理.

//Assume this messages collection contains 1000 items
var messages = new List<string>();

foreach (var msg in messages)
{
   Task.Factory.StartNew(() =>
   {
    Process(msg);
   });
 }
Run Code Online (Sandbox Code Playgroud)

我们可以猜测当时同时处理多少个最大消息(假设是普通的四核处理器),还是我们可以限制当时要处理的最大消息数?

如何确保以与Collection相同的顺序/顺序处理此消息?

Har*_*sad 39

你可以使用Parallel.Foreach和依赖MaxDegreeOfParallelism.

Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10},
msg =>
{
     // logic
     Process(msg);
});
Run Code Online (Sandbox Code Playgroud)

  • 我只是想提醒大家不要使用 Parallel.ForEach 来执行异步 I/O 绑定任务。它并不是真正为异步操作创建的。它只会启动 X 个线程池线程并在 I/O 等待期间阻塞它们。使用“SemaphoreSlim”代替 (6认同)
  • 这正是`Parallel.ForEach` 的处理方式。 (3认同)

Cle*_*gic 25

SemaphoreSlim在这种情况下是一个非常好的解决方案,我强烈建议OP尝试这个,但@ Manoj的答案有注释中提到的缺陷.在产生这样的任务之前应该等待.

更新后的答案:由于@Vasyl指出信号量可能在任务完成之前被处理,并且在Release()调用方法时会引发异常,因此在退出使用块之前必须等待所有创建的任务完成.

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
    List<Task> tasks = new List<Task>();
    foreach(var msg in messages)
    {
        concurrencySemaphore.Wait();

        var t = Task.Factory.StartNew(() =>
        {
            try
            {
                 Process(msg);
            }
            finally
            {
                concurrencySemaphore.Release();
            }
        });

        tasks.Add(t);
    }

    Task.WaitAll(tasks.ToArray());
}
Run Code Online (Sandbox Code Playgroud)

如果没有Task.WaitAll 在控制台应用程序中运行以下代码,那么想要查看信号量如何处理的人可以回答评论,并且会引发此异常.

System.ObjectDisposedException:'信号量已被释放.'

static void Main(string[] args)
{
    int maxConcurrency = 5;
    List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        List<Task> tasks = new List<Task>();
        foreach (var msg in messages)
        {
            concurrencySemaphore.Wait();

            var t = Task.Factory.StartNew(() =>
            {
                try
                {
                    Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });

            tasks.Add(t);
        }

       // Task.WaitAll(tasks.ToArray());
    }
    Console.WriteLine("Exited using block");
    Console.ReadKey();
}

private static void Process(string msg)
{            
    Thread.Sleep(2000);
    Console.WriteLine(msg);
}
Run Code Online (Sandbox Code Playgroud)

  • 如果“Process”方法长时间运行会怎样?当 `concurrencySemaphore` 已经被释放时,可以调用 `concurrencySemaphore.Release()`。结果是 - `ObjectDisposeException`。 (2认同)
  • 在所有任务完成之前,信号量怎么可能被处理掉? (2认同)

5an*_*dr0 8

随着.NET 5.0Core 3.0 通道的引入。
这种生产者/消费者并发模式的主要好处是您还可以限制输入数据处理以减少资源影响。
这在处理数百万条数据记录时特别有用。
您现在可以连续查询数据块,并等待工作人员处理它,然后再查询更多数据,而不是立即将整个数据集读取到内存中。

队列容量为 50 条消息且最大逻辑处理器作为消费者线程的代码示例:

/// <exception cref="System.AggregateException">Thrown on Consumer Task exceptions.</exception>
public static async Task ProcessMessages(
    List<string> messages,
    int producerCapacity = 50, 
    int consumerTaskLimit = 0)
{
    if (consumerTaskLimit == 0)
        consumerTaskLimit = Environment.ProcessorCount;
    // by default only uses one processor group
    // https://stackoverflow.com/questions/27965962/c-sharp-environment-processorcount-does-not-always-return-the-full-number-of-log

    var tokenSource = new CancellationTokenSource();
    CancellationToken ct = tokenSource.Token;

    var channel = Channel.CreateBounded<string>(producerCapacity);

    _ = Task.Run(async () =>
    {
        try
        {
            foreach (var msg in messages)
            {
                await channel.Writer.WriteAsync(msg, ct);
                ct.ThrowIfCancellationRequested();
                // blocking when channel is full
                // waiting for the consumer tasks to pop messages from the queue
            }
        }
        catch (OperationCanceledException) { }
        catch (Exception ex)
        {
            Console.WriteLine("Exception while processing Messages\n" + ex);
            tokenSource.Cancel();
        }
        finally
        {
            channel.Writer.Complete();
            // signaling the end of queue so that 
            // WaitToReadAsync will return false to stop the consumer tasks
        }
    });

    var consumerTasks = Enumerable
    .Range(1, consumerTaskLimit)
    .Select(_ => Task.Run(async () =>
    {
        try
        {
            while (await channel.Reader.WaitToReadAsync(ct))
            {
                ct.ThrowIfCancellationRequested();
                while (channel.Reader.TryRead(out var message))
                {
                    await Task.Delay(500);
                    Console.WriteLine(message);
                }
            }
        }
        catch (OperationCanceledException) { }
        catch
        {
            tokenSource.Cancel();
            throw;
        }
    }))
    .ToArray();

    Task waitForConsumers = Task.WhenAll(consumerTasks);
    try { await waitForConsumers; }
    catch
    {
        if (waitForConsumers.IsFaulted && waitForConsumers.Exception is not null)
        {
            foreach (var e in waitForConsumers.Exception.Flatten().InnerExceptions)
                Console.WriteLine(e.ToString());

            throw waitForConsumers.Exception.Flatten();
        }
        else throw;
    }
}
Run Code Online (Sandbox Code Playgroud)

正如Theodor Zoulias所指出的:在多个消费者异常时,剩余的任务将继续运行,并且必须承担被杀死任务的负载。为了避免这种情况,我实现了 CancellationToken 来停止所有剩余任务并处理waitForConsumers.ExceptionAggregateException中组合的异常。

旁注:
任务并行库 (TPL)可能擅长根据本地资源自动限制任务。但是,当您通过 RPC 远程处理数据时,有必要手动限制 RPC 调用,以避免填满网络/处理堆栈!

  • 是的,现在好多了。此时,您可能已经意识到使用“通道”和“任务”实现此类功能既具有挑战性又费力。实际上,这样做没有多大意义,除了作为一种学习体验之外,当此功能已经在 .NET 5 中以 `ActionBlock&lt;T&gt;` 类的形式本地提供时(更不用说 `TransformBlock&lt; TInput,TOutput&gt;` 以及 TPL 数据流库的其他强大块)。您可以在[此处](/sf/answers/4567636461/) 中看到“ActionBlock&lt;T&gt;”。 (2认同)
  • 两年后,我应该提到的是,我不再对 TPL 数据流库(`ActionBlock&lt;T&gt;`)那么热衷。该库的组件具有令人讨厌的[设计](https://github.com/dotnet/runtime/issues/29619“如果抛出 TaskCanceledException,Dataflow TransformBlock 将默默失败”)抑制任何可能出现的“OperationCanceledException”行为。由“action”抛出,导致在这种情况下错误地报告成功的“Completion”。 (2认同)

小智 7

我认为使用Parallel LINQ会更好

  Parallel.ForEach(messages ,
     new ParallelOptions{MaxDegreeOfParallelism = 4},
            x => Process(x);
        );
Run Code Online (Sandbox Code Playgroud)

其中x是MaxDegreeOfParallelism


归档时间:

查看次数:

28018 次

最近记录:

6 年,7 月 前