意外行为 - TPL DataFlow BatchBlock在执行TriggerBatch时拒绝项目

Al *_*ros 9 c# tpl-dataflow

当您创建具有有界容量的批处理块并调用triggerBatch时(并行)发布新项目 - 在触发批处理执行时间期间发布新项目将失败.

在输入数据流暂停或减速的情况下,调用触发批处理(每X次)以确保数据在块中不会延迟太长时间.

以下代码将输出一些"post failure"事件.例如:

    public static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
        var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
        batchBlock.LinkTo(actionBlock);

        var producerTask = Task.Factory.StartNew(() =>
        {
            //Post 10K Items
            for (int i = 0; i < 10000; i++)
            {
                var postResult = batchBlock.Post(i);
                if (!postResult)
                    Console.WriteLine("Failed to Post");
            }
        });

        var triggerBatchTask = Task.Factory.StartNew(() =>
            {                    
                //Trigger Batch..
                for (int i = 0; i < 1000000; i++)
                    batchBlock.TriggerBatch();
            });

        producerTask.Wait();
        triggerBatchTask.Wait();
    }

    public static void ProcessBatch(int[] batch)
    {
        Console.WriteLine("{0} - {1}", batch.First(), batch.Last());
    }
Run Code Online (Sandbox Code Playgroud)

*请注意,仅当批处理块有界时,此方案才可重现.

我错过了什么或者是否是batchBlock的问题?

svi*_*ick 5

BatchBlock不会真的拒绝的项目,它试图推迟它。除了在 的情况下Post(),推迟不是一种选择。解决此问题的一种简单方法是使用await batchBlock.SendAsync(i)而不是batchBlock.Post(i)(这也意味着您需要更改Task.Factory.StartNew(() =>Task.Run(async () =>)。

为什么会发生这种情况?根据源代码,如果BatchBlock有界,TriggerBatch()则异步处理,并且在处理过程中,不会接受任何新项目。

在任何情况下,您都不应该期望Post()总是true在有界块上返回,如果块已满,Post()也将返回false