med*_*onn 5 c# parallel-processing parallel.foreach
我正在寻找一种“整洁”且有效的方法来实现长步骤 1(可以并行化)和步骤 2 的组合,步骤 2 需要按原始顺序(如果可能的话,尽量减少来自第一步保存在 RAM 中)同时允许第二步在第一个对象的步骤 1 中的数据可用时立即开始,并与步骤 2 一起提供更多数据。
为了更详细地说明这一点,我需要压缩大量图像(慢速 - 第 1 步),然后通过网络连接按顺序发送每个图像(第 2 步)。在任何阶段限制 RAM 中准备好的压缩数据块的数量也很重要,例如,如果发送 1000 张图像,我想将“已完成”但未发送的图像数量限制为(例如)线程数/使用的处理器。
我已经完成了这个的“手写”版本,使用了一组 Task 对象,但它看起来很混乱,而且我相信其他人一定有类似的需求,所以有没有更“标准”的方法来做到这一点? 理想情况下,我希望有 2 个代表的 Parallel.ForEach 变体 - 一个用于第 1 步,一个用于第 2 步,我希望标准覆盖之一(例如包含“localFinal”参数的覆盖)可能有所帮助,但在原来这些最后阶段是“每个线程”,而不是“每个委托”。
任何人都可以指出我现有的巧妙方法来实现这一目标吗?
您可以结合使用 Plinq(用于WithDegreeOfParallelism()限制第一阶段的并发)以及已完成块的 BlockingCollection。另请注意,它使用AsOrdered()保留原始顺序。
下面的例子演示了。对于您的实际应用程序,您可以将int此处显示的工作项替换为您的工作项类型 - 文件名或包含与每个工作项相关的信息的类。
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static void Main()
{
int maxThreads = 4;
int maxOutputQueueSize = 10;
var workItems = Enumerable.Range(1, 100); // Pretend these are your files
var outputQueue = new BlockingCollection<int>(maxOutputQueueSize);
var worker = Task.Run(() => output(outputQueue));
var parallelWorkItems =
workItems
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(maxThreads)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(process);
foreach (var item in parallelWorkItems)
outputQueue.Add(item);
outputQueue.CompleteAdding();
worker.Wait();
Console.WriteLine("Done.");
}
static int process(int value) // Pretend that this compresses the data.
{
Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} is processing {value}");
Thread.Sleep(250); // Simulate slow operation.
return value; // Return updated work item.
}
static void output(BlockingCollection<int> queue)
{
foreach (var item in queue.GetConsumingEnumerable())
Console.WriteLine($"Output is processing {item}");
Console.WriteLine("Finished outputting.");
}
}
}
Run Code Online (Sandbox Code Playgroud)
请注意如何限制输入队列处理(通过WithDegreeOfParallelism)和输出队列的大小(使用maxOutputQueueSize参数)。
或者,如果您使用 .Net 4.5 或更高版本,您可以查看TPL Dataflow 库,它对此类事情有很多支持。如果可以的话,我建议使用它 - 但在这里的答案中描述有点太多了。
| 归档时间: |
|
| 查看次数: |
494 次 |
| 最近记录: |