Mat*_*olf 10 c# concurrency producer-consumer task-parallel-library tpl-dataflow
我在不同的任务上运行一个非常典型的生产者/消费者模型
Task1:从二进制文件中读取一批byte [],并为每个字节数组集合启动一个新任务.(该操作是为了内存管理目的而批量处理的).
任务2-n:这些是工作任务,每个都在字节数组的传入集合(来自Tasks1)上运行,并对字节数组进行反序列化,按特定条件对它们进行排序,然后存储结果对象的集合(每个字节数组)在"并发字典"中反序列化为此类对象.
任务(n + 1)我选择了并发字典,因为此任务的工作是以与它们来自Task1的顺序相同的顺序合并存储在并发字典中的那些集合.我通过传递一个collectionID(它是int类型并为Task1中的每个新集合递增)从Task1到此任务一直向下实现.此任务基本上检查下一个预期的collectionID是否已存储在并发字典中,如果是,则将其取出,将其添加到Final Queue并检查并发字典中的下一个集合.
现在,从我所看到的和我观看的视频看来,TPL Dataflow可能是这种生产者/消费者模型的完美候选者.我似乎无法设计并因此开始,因为我从未使用过TPL Dataflow.在吞吐量和延迟方面,这个库甚至可以完成任务吗?我目前处理250万字节数组,因此在生成的集合中每秒处理对象.TPL Dataflow可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL Dataflow可以在产生工作任务时保留Task1中的集合批次的顺序,并在工作任务完成后重新合并它们吗?它是否优化了什么?在对整个结构进行分析之后,我觉得由于旋转和涉及太多并发集合而浪费了相当多的时间.
任何想法,想法?
svi*_*ick 12
编辑:事实证明我错了.TransformBlock 确实按照它们进入的顺序返回项目,即使它是为并行性配置的.因此,我原来答案中的代码完全无用,TransformBlock可以使用普通代码.
原始答案:
据我所知,.Net中只有一个并行构造支持按照它们的顺序返回已处理的项目:PLINQ with AsOrdered().但在我看来,PLINQ并不适合你想要的.
另一方面,TPL Dataflow非常适合我,但它没有一个块可以同时支持并行和返回项目(TransformBlock支持它们,但不能同时支持).幸运的是,Dataflow块在设计时考虑了可组合性,因此我们可以构建自己的块来实现这一点.
但首先,我们必须弄清楚如何订购结果.像你建议的那样使用并发字典以及一些同步机制肯定会起作用.但我认为有一个更简单的解决方案:使用Tasks 队列.在输出任务中,您将a出列Task,等待它完成(异步),当它出现时,您将其结果发送出去.当队列为空时,我们仍然需要一些同步,但如果我们选择巧妙使用哪个队列,我们可以免费获得.
所以,一般的想法是这样的:我们写的是一个IPropagatorBlock,有一些输入和一些输出.创建自定义的最简单方法IPropagatorBlock是创建一个处理输入的块,另一个生成结果的块并将它们视为一个使用DataflowBlock.Encapsulate().
输入块必须以正确的顺序处理传入的项目,因此不存在并行化.它将创建一个新的Task(实际上,a TaskCompletionSource,以便我们可以设置Task后者的结果),将其添加到队列中,然后发送项目进行处理,以及设置正确结果的某种方式Task.因为我们不需要将这个块链接到任何东西,我们可以使用ActionBlock.
输出块必须Task从队列中取出s,异步等待它们,然后发送它们.但是由于所有块都嵌入了一个队列,并且带有委托的块具有内置的异步等待,这将非常简单:new TransformBlock<Task<TOutput>, TOutput>(t => t).该块既可用作队列,也可用作输出块.因此,我们不必处理任何同步.
拼图的最后一部分实际上是并行处理项目.为此,我们可以使用另一个ActionBlock,这次是MaxDegreeOfParallelismset.它将接受输入,处理它,并Task在队列中设置正确的结果.
放在一起,它可能看起来像这样:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
Run Code Online (Sandbox Code Playgroud)
经过这么多的讨论,我认为这是相当少量的代码.
看起来你很关心性能,所以你可能需要微调这段代码.例如,MaxDegreeOfParallelism将processor块设置为类似的东西可能是有意义的Environment.ProcessorCount,以避免超额订阅.此外,如果延迟对于您来说比吞吐量更重要,那么MaxMessagesPerTask将同一个块设置为1(或另一个小数字)可能是有意义的,这样当项目处理完成后,它会立即发送到输出.
另外,如果要限制进入的项目,您可以设置BoundedCapacity的enqueuer.
| 归档时间: |
|
| 查看次数: |
2352 次 |
| 最近记录: |