Syl*_*ain 3 c# dataflow task-parallel-library
考虑以下示例:
        ActionBlock<TimeSpan> ab = new ActionBlock<TimeSpan>(async _ =>
        {
            await Task.Delay(_);
            throw new Exception();
        }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = Int32.MaxValue });
        ab.Post(TimeSpan.FromSeconds(10d));
        ab.Post(TimeSpan.FromDays(1d));
        await ab.Completion;
Run Code Online (Sandbox Code Playgroud)
不出所料,我在输出窗口中看到10秒后将引发异常,但这不会导致dataFlow完成(等待ab之后的断点。完成将不会在1天前命中)。
就我而言,如果单个计算步骤中有异常,我想取消整个dataFlow。
我看不到如何使用tpl dataflow做到这一点...有什么建议吗?
谢谢...
[edit]如Ofir所述,我可以这样做:
        ActionBlock<TimeSpan> ab = new ActionBlock<TimeSpan>(async _ =>
        {
            try
            {
                await Task.Delay(_);
                throw new Exception();// Or any other Task thay may throw an exception.
            }
            catch(Exception)
            {
                cancelTokenSource.Cancel();
                throw;
            }
        }, new ExecutionDataflowBlockOptions() {CancellationToken=cancelTokenSource.Token, MaxDegreeOfParallelism = Int32.MaxValue });
        ab.Post(TimeSpan.FromSeconds(10d));
        ab.Post(TimeSpan.FromDays(1d));
        await ab.Completion;
Run Code Online (Sandbox Code Playgroud)
但这恰恰是我要避免的事情:一次又一次地键入相同的try {} catch……或更糟糕的是:忘记了...;)
我接下来要做的是写一个替换ActionBlock的构造函数来处理该问题的方法(这将需要一个额外的CancellationTokenSource参数)...我很惊讶这不是直接与数据流一起出现的吗?确实如此吗?
[最终编辑]似乎答案是tpd dataFlow中没有这样的东西,并且ActionBlock "extension constructor" (in fact a static method)接受CancellationTokenSource作为参数将是可能的解决方法...
将Exception你扔会把ActionBlock进入故障状态,并且将放弃所有缓冲的消息删除并不会接受更多的消息。
这是真实的也有CancelationToken(可在提供ExecutionDataflowBlockOptions)
当前处理的消息完成后,您将在AggregateException上收到一个await ab.Completion。
与一样Task,您必须自己照顾已经执行的消息的中止。
为了演示,在您提供的示例中,可以这样实现:
var cancellationTokenSource = new CancellationTokenSource();
var ab = new ActionBlock<TimeSpan>(async _  =>
{
    // await with cancellation token
    await Task.Delay(_, cancellationTokenSource.Token);
    cancellationTokenSource.Cancel();
}, new ExecutionDataflowBlockOptions {CancellationToken = cancellationTokenSource.Token, MaxDegreeOfParallelism = int.MaxValue});
ab.Post(TimeSpan.FromSeconds(10));
ab.Post(TimeSpan.FromSeconds(20));
Thread.Sleep(15000);
cancellationTokenSource.Cancel();
ab.Post(TimeSpan.FromSeconds(100));
try { await ab.Completion; }
catch(TaskCancelationException ex)
{ }
Run Code Online (Sandbox Code Playgroud)
在上述情况下,我们将发布2条消息,这些消息将立即运行。
10秒后,所述第一消息,将引起Cancel的cancellationTokenSource,并且使其他消息(即延迟20秒),以立即完成,并提出了ActionBlock在取消状态。我们尝试发布的下一条消息不会被接受,也不会执行。  
15秒后,我们将TaskCancelationException等待一段时间completion。