具有有界容量的变换块中的TPL数据流异常

Mic*_*tov 7 c# task-parallel-library tpl-dataflow

我需要构建TPL数据流管道,它将处理大量消息.因为有很多消息我不能简单地将Post它们放入无限队列中,BufferBlock否则我将面临内存问题.所以我想使用BoundedCapacity = 1选项来禁用队列并使用MaxDegreeOfParallelism并行任务处理,因为我的TransformBlocks可能需要一些时间来处理每条消息.我也用来PropagateCompletion完成所有的完成并且无法沿管道向下传播.

但是我正面临着错误处理的问题,当错误发生在第一条消息之后:调用await SendAsync只是将我的应用程序切换到无限等待.

我已将我的案例简化为示例控制台应用程序:

var data_buffer = new BufferBlock<int> (new DataflowBlockOptions
                                        {
                                            BoundedCapacity = 1
                                        });

var process_block = new ActionBlock<int> (x => { throw new InvalidOperationException (); },
                                            new ExecutionDataflowBlockOptions
                                            {
                                                MaxDegreeOfParallelism = 2,
                                                BoundedCapacity = 1
                                            });

data_buffer.LinkTo (process_block, new DataflowLinkOptions { PropagateCompletion = true });


for (var k = 1; k <= 5; k++)
{
    await data_buffer.SendAsync (k);
    Console.WriteLine ("Send: {0}", k);
}

data_buffer.Complete ();

await process_block.Completion;
Run Code Online (Sandbox Code Playgroud)

Ste*_*ary 6

这是预期的行为.如果"下游"发生故障,则错误不会向后"向后"传播.网格期望您检测到该故障(例如,通过process_block.Completion)并解决它.

如果你想向后传播错误,你可以有一个await或延续process_block.Completion故障上游块(S)如果下游块(一个或多个)故障.

请注意,这不是唯一可行的解​​决方案; 您可能希望重建网格的该部分或将源链接到备用目标.源块没有出现故障,因此可以继续使用已修复的网格进行处理.

  • @pnewhook:我更喜欢在故障上拆掉整个网格(我想大多数人会这样做),但我们没有*到.如果块没有传播完成并且它出现故障,那么您可以将其与其他块取消链接并放入替换块(而其余的网格仍然在运行). (4认同)
  • @MichaelLogutov:确实如此.它会将任何错误从"BufferBlock"传播到"ActionBlock".但是数据和完成/故障都不会在链路上传播*向后传播*. (3认同)