Mic*_*tov 7 c# task-parallel-library tpl-dataflow
我需要构建TPL数据流管道,它将处理大量消息.因为有很多消息我不能简单地将Post它们放入无限队列中,BufferBlock否则我将面临内存问题.所以我想使用BoundedCapacity = 1选项来禁用队列并使用MaxDegreeOfParallelism并行任务处理,因为我的TransformBlocks可能需要一些时间来处理每条消息.我也用来PropagateCompletion完成所有的完成并且无法沿管道向下传播.
但是我正面临着错误处理的问题,当错误发生在第一条消息之后:调用await SendAsync只是将我的应用程序切换到无限等待.
我已将我的案例简化为示例控制台应用程序:
var data_buffer = new BufferBlock<int> (new DataflowBlockOptions
{
BoundedCapacity = 1
});
var process_block = new ActionBlock<int> (x => { throw new InvalidOperationException (); },
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2,
BoundedCapacity = 1
});
data_buffer.LinkTo (process_block, new DataflowLinkOptions { PropagateCompletion = true });
for (var k = 1; k <= 5; k++)
{
await data_buffer.SendAsync (k);
Console.WriteLine ("Send: {0}", k);
}
data_buffer.Complete ();
await process_block.Completion;
Run Code Online (Sandbox Code Playgroud)
这是预期的行为.如果"下游"发生故障,则错误不会向后"向后"传播.网格期望您检测到该故障(例如,通过process_block.Completion)并解决它.
如果你想向后传播错误,你可以有一个await或延续process_block.Completion该故障上游块(S)如果下游块(一个或多个)故障.
请注意,这不是唯一可行的解决方案; 您可能希望重建网格的该部分或将源链接到备用目标.源块没有出现故障,因此可以继续使用已修复的网格进行处理.