BatchBlock 使用 TriggerBatch() 之后发送的元素生成批次

woj*_*rak 4 c# task-parallel-library tpl-dataflow

我有一个由多个块组成的数据流管道。当元素流经我的处理管道时,我想按 field 对它们进行分组A。为此,我有一个BatchBlockBoundedCapacity. 我在其中存储我的元素,直到我决定应该释放它们。所以我调用TriggerBatch()方法。

private void Forward(TStronglyTyped data)
{
    if (ShouldCreateNewGroup(data))
    {
        GroupingBlock.TriggerBatch();
    }

 GroupingBlock.SendAsync(data).Wait(SendTimeout);
}
Run Code Online (Sandbox Code Playgroud)

这就是它的样子。问题是,生成的批次有时包含下一个发布的元素,该元素不应该在那里。

为了显示:

BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);
Run Code Online (Sandbox Code Playgroud)

在这一点上,我希望我的批次是{A,A,A},但它是{A,A,A,B}

LikeTriggerBatch()是异步的,SendAsync实际上是在实际批处理之前执行的。

我该如何解决这个问题?我显然不想放在Task.Wait(x)那里(我尝试过,它有效,但当然性能很差)。

Lor*_*sen 5

我也遇到过这个问题,尝试TriggerBatch在错误的地方调用。如前所述,使用的 SlidingWindow 示例DataflowBlock.Encapsulate就是这里的答案,但需要一些时间来适应,所以我想我应该分享我完成的块。

ConditionalBatchBlock创建的批次达到最大大小,如果满足特定条件,可能会更快。在我的特定场景中,我需要创建 100 个批次,但在检测到数据中的某些更改时始终创建新批次。

public static IPropagatorBlock<T, T[]> CreateConditionalBatchBlock<T>(int batchSize, Func<Queue<T>, T, bool> condition)
{
    var queue = new Queue<T>();

    var source = new BufferBlock<T[]>();

    var target = new ActionBlock<T>(async item =>
    {
        // start a new batch if required by the condition
        if (condition(queue, item))
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }

        queue.Enqueue(item);

        // always send a batch when the max size has been reached
        if (queue.Count == batchSize)
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }
    });

    // send any remaining items
    target.Completion.ContinueWith(async t =>
    {
        if (queue.Any())
            await source.SendAsync(queue.ToArray());

        source.Complete();
    });

    return DataflowBlock.Encapsulate(target, source);
}
Run Code Online (Sandbox Code Playgroud)

对于您的情况,该condition参数可能更简单。我需要查看队列以及当前项目来确定是否创建新批次。

我这样使用它:

public async Task RunExampleAsync<T>()
{
    var conditionalBatchBlock = CreateConditionalBatchBlock<T>(100, (queue, currentItem) => ShouldCreateNewBatch(queue, currentItem));

    var actionBlock = new ActionBlock<T[]>(async x => await PerformActionAsync(x));

    conditionalBatchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await ReadDataAsync<T>(conditionalBatchBlock);

    await actionBlock.Completion;
}
Run Code Online (Sandbox Code Playgroud)