如果排队项的数量小于BatchSize,如何在超时后自动调用TriggerBatch?

Sof*_*ion 12 dataflow task-parallel-library

使用Dataflow CTP(在TPL中)

如果在超时后当前排队或推迟的项目数小于BatchSize,是否有办法自动调用BatchBlock.TriggerBatch?

更好的是:每次块接收到新项时,此超时应重置为0.

Dre*_*rsh 16

是的,您可以通过将块链接在一起而相当优雅地完成此任务.在这种情况下,您需要设置一个您在BatchBlock"之前"链接的TransformBlock.这看起来像这样:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());

TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

timeoutTransformBlock.LinkTo(yourBatchBlock);

yourBufferBlock.LinkTo(timeoutTransformBlock);
Run Code Online (Sandbox Code Playgroud)


The*_*ias 5

这是优秀的 Drew Marsh解决方案的监管版本。这一个使用DataflowBlock.Encapsulate方法创建一个封装了定时器+批处理功能的数据流块。除了新参数之外timeout,该CreateBatchBlock方法还支持普通BatchBlock构造函数可用的所有选项。

public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
    int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
    dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
    var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
    var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
    var transformBlock = new TransformBlock<T, T>((T value) =>
    {
        timer.Change(timeout, Timeout.Infinite);
        return value;
    }, new ExecutionDataflowBlockOptions()
    {
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
        MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
        NameFormat = dataflowBlockOptions.NameFormat,
        TaskScheduler = dataflowBlockOptions.TaskScheduler
    });
    transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
    {
        PropagateCompletion = true
    });
    return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}
Run Code Online (Sandbox Code Playgroud)