`Parallel.ForEach` 带有按定义顺序的最后一步

med*_*onn 5 c# parallel-processing parallel.foreach

我正在寻找一种“整洁”且有效的方法来实现长步骤 1(可以并行化)和步骤 2 的组合,步骤 2 需要按原始顺序(如果可能的话,尽量减少来自第一步保存在 RAM 中)同时允许第二步在第一个对象的步骤 1 中的数据可用时立即开始,并与步骤 2 一起提供更多数据。

为了更详细地说明这一点,我需要压缩大量图像(慢速 - 第 1 步),然后通过网络连接按顺序发送每个图像(第 2 步)。在任何阶段限制 RAM 中准备好的压缩数据块的数量也很重要,例如,如果发送 1000 张图像,我想将“已完成”但未发送的图像数量限制为(例如)线程数/使用的处理器。

我已经完成了这个的“手写”版本,使用了一组 Task 对象,但它看起来很混乱,而且我相信其他人一定有类似的需求,所以有没有更“标准”的方法来做到这一点? 理想情况下,我希望有 2 个代表的 Parallel.ForEach 变体 - 一个用于第 1 步,一个用于第 2 步,我希望标准覆盖之一(例如包含“localFinal”参数的覆盖)可能有所帮助,但在原来这些最后阶段是“每个线程”,而不是“每个委托”。

任何人都可以指出我现有的巧妙方法来实现这一目标吗?

Mat*_*son 4

您可以结合使用 Plinq(用于WithDegreeOfParallelism()限制第一阶段的并发)以及已完成块的 BlockingCollection。另请注意,它使用AsOrdered()保留原始顺序。

下面的例子演示了。对于您的实际应用程序,您可以将int此处显示的工作项替换为您的工作项类型 - 文件名或包含与每个工作项相关的信息的类。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            int maxThreads = 4;
            int maxOutputQueueSize = 10;
            var workItems = Enumerable.Range(1, 100); // Pretend these are your files
            var outputQueue = new BlockingCollection<int>(maxOutputQueueSize);
            var worker = Task.Run(() => output(outputQueue));

            var parallelWorkItems = 
                workItems
                .AsParallel()
                .AsOrdered()
                .WithDegreeOfParallelism(maxThreads)
                .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                .Select(process);

            foreach (var item in parallelWorkItems)
                outputQueue.Add(item);

            outputQueue.CompleteAdding();
            worker.Wait();

            Console.WriteLine("Done.");
        }

        static int process(int value) // Pretend that this compresses the data.
        {
            Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} is processing {value}");
            Thread.Sleep(250);  // Simulate slow operation.
            return value; // Return updated work item.
        }

        static void output(BlockingCollection<int> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
                Console.WriteLine($"Output is processing {item}");

            Console.WriteLine("Finished outputting.");
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意如何限制输入队列处理(通过WithDegreeOfParallelism)和输出队列的大小(使用maxOutputQueueSize参数)。

或者,如果您使用 .Net 4.5 或更高版本,您可以查看TPL Dataflow 库,它对此类事情有很多支持。如果可以的话,我建议使用它 - 但在这里的答案中描述有点太多了。