并行 - 将项目添加到正在迭代的集合中,或等效的?

Eri*_*ric 5 c# parallel-processing task-parallel-library parallel.foreach

现在,我有一个 C# 程序,它定期执行以下步骤:

  • 从数据库中获取当前任务列表
  • 使用Parallel.ForEach(), do work 完成每项任务

然而,其中一些任务运行时间非常长。这会延迟其他待处理任务的处理,因为我们只在程序开始时查找新任务。

现在,我知道修改正在迭代的集合是不可能的(对吗?),但是 C# 框架中是否有一些等效的功能Parallel允许我将工作添加到列表中,同时处理列表中的项目?

svi*_*ick 3

一般来说,您是对的,在迭代集合时修改集合是不允许的。但您还可以使用其他方法:

  • ActionBlock<T>从 TPL Dataflow使用。代码可能类似于:

    var actionBlock = new ActionBlock<MyTask>(
        task => DoWorkForTask(task),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    
    while (true)
    {
        var tasks = GrabCurrentListOfTasks();
        foreach (var task in tasks)
        {
            actionBlock.Post(task);
    
            await Task.Delay(someShortDelay);
            // or use Thread.Sleep() if you don't want to use async
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 使用BlockingCollection<T>,可以在使用其中的项目时对其进行修改,并与GetConsumingParititioner()ParallelExtensionsExtras 一起使用,以使其与 一起使用Parallel.ForEach()

    var collection = new BlockingCollection<MyTask>();
    
    Task.Run(async () =>
    {
        while (true)
        {
            var tasks = GrabCurrentListOfTasks();
            foreach (var task in tasks)
            {
                collection.Add(task);
    
                await Task.Delay(someShortDelay);
            }
        }
    });
    
    Parallel.ForEach(collection.GetConsumingPartitioner(), task => DoWorkForTask(task));
    
    Run Code Online (Sandbox Code Playgroud)