Sof*_*ion 12 dataflow task-parallel-library
使用Dataflow CTP(在TPL中)
如果在超时后当前排队或推迟的项目数小于BatchSize,是否有办法自动调用BatchBlock.TriggerBatch?
更好的是:每次块接收到新项时,此超时应重置为0.
Dre*_*rsh 16
是的,您可以通过将块链接在一起而相当优雅地完成此任务.在这种情况下,您需要设置一个您在BatchBlock"之前"链接的TransformBlock.这看起来像这样:
Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());
TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
triggerBatchTimer.Change(5000, Timeout.Infinite);
return value;
});
timeoutTransformBlock.LinkTo(yourBatchBlock);
yourBufferBlock.LinkTo(timeoutTransformBlock);
Run Code Online (Sandbox Code Playgroud)
这是优秀的 Drew Marsh解决方案的监管版本。这一个使用DataflowBlock.Encapsulate方法创建一个封装了定时器+批处理功能的数据流块。除了新参数之外timeout,该CreateBatchBlock方法还支持普通BatchBlock构造函数可用的所有选项。
public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
var transformBlock = new TransformBlock<T, T>((T value) =>
{
timer.Change(timeout, Timeout.Infinite);
return value;
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
NameFormat = dataflowBlockOptions.NameFormat,
TaskScheduler = dataflowBlockOptions.TaskScheduler
});
transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2345 次 |
| 最近记录: |