限制通过并行任务库运行的活动任务数的最佳方法

Rya*_*yan 36 .net c# task-parallel-library

考虑一个包含大量需要处理的作业的队列.队列限制一次只能获得1个工作,无法知道有多少工作.这些作业需要10秒才能完成,并且需要大量等待来自Web服务的响应,因此不受CPU限制.

如果我使用这样的东西

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}
Run Code Online (Sandbox Code Playgroud)

然后,它会以比完成它们更快的速度从队列中快速弹出作业,耗尽内存并堕落.> <

我不能使用(我不认为)ParallelOptions.MaxDegreeOfParallelism因为我不能使用Parallel.Invoke或Parallel.ForEach

我找到了3个替代方案

  1. 用.替换Task.Factory.StartNew

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();
    
    Run Code Online (Sandbox Code Playgroud)

    这似乎在某种程度上解决了这个问题,但我不清楚这是做什么的,如果这是最好的方法.

  2. 创建一个限制并发度的自定义任务调度程序

  3. 使用类似BlockingCollection的东西在启动时将作业添加到集合中,并在完成时删除以限制可以运行的编号.

#1我必须相信自己做出正确的决定,#2 /#3我必须计算出自己可以运行的最大数量的任务.

我是否理解正确 - 这是更好的方式,还是有另一种方式?

编辑 - 这是我从下面的答案,生产者 - 消费者模式中得出的结果.

除了整体吞吐量目标不是要比可以处理的更快地使作业出列并且没有多个线程轮询队列(这里没有显示但是这是非阻塞的操作并且如果从多个地方以高频率轮询将导致巨大的交易成本) .

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}
Run Code Online (Sandbox Code Playgroud)

usr*_*usr 22

我刚给出了一个非常适用于这个问题的答案.

基本上,TPL Task类用于安排CPU绑定的工作.它不是用于阻止工作.

您正在使用非CPU的资源:等待服务回复.这意味着TPL会错误地管理您的资源,因为它会在一定程度上假定CPU的有限性.

自己管理资源:启动固定数量的线程或LongRunning任务(基本相同).凭经验确定线程数.

你不能把不可靠的系统投入生产.出于这个原因,我建议#1但受到限制.不要创建与工作项一样多的线程.创建尽可能多的线程来使远程服务饱和.给自己写一个帮助函数,它产生N个线程并使用它们来处理M个工作项.通过这种方式可以获得完全可预测且可靠的结果.

  • @Ryan,我推荐#1但是*throttled*不要创建与工作项一样多的线程.创建尽可能多的线程来使远程服务饱和.给自己写一个帮助函数,它产生N个线程并使用它们来处理M个工作项.通过这种方式可以获得完全可预测且可靠的结果. (3认同)

Moo*_*tom 12

await在您的代码或第三方库中导致的潜在流分裂和延续将无法很好地处理长时间运行的任务(或线程),因此不要使用长时间运行的任务.在async/await世界上,它们毫无用处.更多细节在这里.

您可以在拨打电话ThreadPool.SetMaxThreads之前拨打电话,确保ThreadPool.SetMinThreads使用低于或等于最大值的值设置最小线程数.顺便说一句,MSDN文档是错误的.您可以使用这些方法调用低于计算机上的核心数,至少在.NET 4.5和4.6中,我使用此技术来降低内存限制32位服务的处理能力.

但是,如果您不希望限制整个应用程序而只限制它的处理部分,则自定义任务调度程序将完成此任务.很久以前,MS发布了几个自定义任务调度程序的样本,包括一个LimitedConcurrencyLevelTaskScheduler.手动生成主要处理任务Task.Factory.StartNew,提供自定义任务调度程序,并由它生成的每个其他任务将使用它,包括async/await甚至Task.Yield用于在async方法的早期实现异步.

但是对于您的特定情况,两种解决方案都不会在完成之前停止用尽您的工作队列.这可能是不可取的,具体取决于您的队列的实现和目的.它们更像是"解雇一堆任务,让调度程序找到执行它们的时间"类型的解决方案.因此,或许更合适的事情可能是通过更严格的方法来控制作业的执行semaphores.代码如下所示:

semaphore = new SemaphoreSlim(max_concurrent_jobs);

while(...){
 job = Queue.PopJob();
 semaphore.Wait();
 ProcessJobAsync(job);
}

async Task ProcessJobAsync(Job job){
 await Task.Yield();
 ... Process the job here...
 semaphore.Release();
}
Run Code Online (Sandbox Code Playgroud)

皮肤猫的方法不止一种.使用您认为合适的.

  • LimitedConcurrencyLevelTask​​Scheduler是黄金.现在还有一个nuget包,https://www.nuget.org/packages/ParallelExtensionsExtras/ (2认同)

Alo*_*atz 8

微软有一个非常酷的库,名为DataFlow,它可以完全满足您的需求(以及更多).细节在这里.

您应该使用ActionBlock类并设置ExecutionDataflowBlockOptions对象的MaxDegreeOfParallelism.ActionBlock可以很好地使用async/await,因此即使等待外部调用,也不会开始处理新的作业.

ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
{
     MaxDegreeOfParallelism = 10
};

this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
            actionBlockOptions);
...
this.sendToAzureActionBlock.Post(itemsToProcess)
Run Code Online (Sandbox Code Playgroud)


svi*_*ick 7

这里的问题似乎并没有太多的运行 Task,它的计划 太多了Task.Task无论执行速度有多快,您的代码都会尽量安排尽可能多的s.如果你有太多的工作,这意味着你会得到OOM.

因此,您提出的解决方案都不会真正解决您的问题.如果看起来只是指定LongRunning解决你的问题,那么这很可能是因为创建一个新的Thread(这是什么LongRunning)需要一些时间,这有效地限制了新的工作.因此,此解决方案只能偶然发挥作用,并且很可能在以后导致其他问题.

至于解决办法,我大多与USR同意:那效果相当好最简单的解决方法是创建固定数量的LongRunning任务,并有一个循环调用Queue.PopJob()(由一个受保护的lock,如果该方法不是线程安全的)和Execute()S上的工作.

更新:经过一番思考后,我意识到以下尝试最有可能表现得非常糟糕.只有在您确定它能够很好地适合您时才使用它.


但是TPL试图找出最佳并行度,即使对于IO绑定也是如此Task.因此,您可以尝试使用它来获得优势.Long Tasks在这里不起作用,因为从TPL的角度来看,似乎没有完成任何工作,它会Task一遍又一遍地开始新的工作.你可以做的是Task在每个结束时开始一个新的Task.通过这种方式,TPL将知道发生了什么,并且其算法可能运行良好.另外,为了让TPL决定并行度,在Task它的第一行开始时,开始另一行Tasks.

该算法可能效果很好.但也有可能TPL会对并行度做出错误的决定,我实际上没有尝试过这样的事情.

在代码中,它看起来像这样:

void ProcessJobs(bool isFirst)
{
    var job = Queue.PopJob(); // assumes PopJob() is thread-safe
    if (job == null)
        return;

    if (isFirst)
        Task.Factory.StartNew(() => ProcessJobs(true));

    job.Execute();

    Task.Factory.StartNew(() => ProcessJob(false));
}
Run Code Online (Sandbox Code Playgroud)

然后开始吧

Task.Factory.StartNew(() => ProcessJobs(true));
Run Code Online (Sandbox Code Playgroud)