如何将 ActionBlock 的所有异常包装在单个 AggregateException 中

yBo*_*her 1 c# exception task-parallel-library tpl-dataflow

我遇到了 TPL ActionBlock,它对于(限制的)并行异步操作似乎非常方便。到目前为止,我正在使用Task.WhenAll()(+Semaphore进行节流)。当谈到例外时,似乎存在很大差异:

var successList = new List<int>();
var failedList = new List<int>();
try
{
    var actionBlock = new ActionBlock<int>(
        async x => await Task.Run(() =>
        {
            if (x < 5)
            {
                failedList.Add(x);
                throw new Exception(x.ToString());
            }

            successList.Add(x);
        }),
        new ExecutionDataflowBlockOptions());

    Enumerable.Range(1, 10).Each(x => actionBlock.Post(x));
    actionBlock.Complete();

    await actionBlock.Completion.ConfigureAwait(false);
}
catch (Exception ex)
{
    // works for approach using task.whenall
    Console.WriteLine(ex);
    Assert.True(failedList.Count == 4);
    Assert.True(successList.Count == 6);
    return;
}

Assert.Fail();
Run Code Online (Sandbox Code Playgroud)

该测试失败,因为ActionBlock发生异常时立即停止。我发现这是 github 上的一个问题:Dataflow: Add options for Task Faulting。显然这种行为是不可配置的。

Task.WhenAll()与这样的扩展方法结合使用:

public static async Task PreserveAllExceptions(this Task task)
{
    try
    {
        await task.ConfigureAwait(false);
    }
    catch
    {
        throw task.Exception;
    }
}
Run Code Online (Sandbox Code Playgroud)

将所有(!)异常包装在一个AggregateException但继续处理中:

  await Task.WhenAll(task1,task2).PreserveAllExceptions().ConfigureAwait(false);
Run Code Online (Sandbox Code Playgroud)

有没有一种方便的方法来实现这一点ActionBlock

更新:澄清一下:

  1. 我不打算使用信号量进行节流(为什么我会这样做?),因为已经有这样的选项ExecutionDataflowBlockOptions
  2. 代码片段只是一个演示“问题”的虚拟代码;Task.Run()仅用作实际异步函数的占位符。
  3. 我真正想做的是:以并行方式处理所有消息。不要因错误而取消进一步的消息处理。处理完所有消息后,返回并指示至少发生了一个错误并返回所有错误 -> 我的Task.WhenAll()with AggregateException 的工作原理。我知道我可以try{}catch{}在我的内部ActionBlock并以某种方式存储异常,但我想知道是否有任何配置的可能性可以使这变得更容易。不管怎样,在我使用的任何地方使用 try catch 并收集异常并不是什么大不了的事情ActionBlock。我只是发现Task.WhenAll()+PreserveException扩展对于我的目的来说更干净。

Pan*_*vos 6

目前还不清楚这个问题问什么。但很明显的是 ActionBlock 被滥用了。没有必要,Task.Run因为 ActionBlock 已经使用了一项或多项工作任务。不需要信号量,因为 ActionBlock(和其他块)已经通过限制工作任务和输入队列的数量来支持节流。

该代码似乎也尝试使用异常作为控制流机制,这在过程代码中也是错误的。

异常并不意味着要逃脱阻塞。数据流是一种与常见过程范式完全不同的计算范式——没有函数相互调用,因此没有调用者来接收和处理异常。

在数据流中,块以单一方向相互传递消息。块在管道或网络中组合,接收消息,处理它们并将它们传递到任何连接的块。如果发生异常,则没有“调用者”可以接收该异常。未处理的异常是灾难性的,会导致整个管道瘫痪——不仅仅是单个块,而是它链接到的任何设置为PropagateCompletiontrue 的下游块。但上游区块永远不会知道这一点,从而导致意想不到的情况。

节流

使用 ActionBlock 进行限制很容易 - 对于初学者来说,所有块仅使用一个工作任务。人们可以通过限制上游调用者的输入缓冲区并使用await block.SendAsync()而不是 来限制上游调用者block.Post。不需要,Task.Run因为该块已经使用了工作任务:

var options=new ExecutionDataflowBlockOptions 
{ 
    MaxDegreeOfParallelism=2, 
    BoundedCapacity=2
};
var block =new ActionBlock<Message>(processMessage,options);

...
async Task processMessage(Message msg) { ...}

Run Code Online (Sandbox Code Playgroud)

这足以仅允许两个并发操作,并且如果已经有两条消息在等待,则停止发布者。如果缓冲区已满,SendAsync则以下代码将等待直到有可用槽:

foreach(var msg in someInputCollection)
{
    await block.SendAsync(msg);
}
Run Code Online (Sandbox Code Playgroud)

就是这样。该块将同时处理 2 条消息(默认值为 1),并且在其输入缓冲区中一次仅接受 2 条消息。如果缓冲区已满,则发布循环将等待。

Quick & dirty 速率限制可以通过在处理方法中添加延迟来实现:

var block =new ActionBlock<Message>(msg=>{
    await Task.Delay(200);
    await processMessage(msg);
},options);
Run Code Online (Sandbox Code Playgroud)

条件路由

问题的代码似乎使用异常来实现控制流。这在任何库或范例中都是错误的。由于数据流在网络中工作,因此控制流相当于条件路由。

这也可以通过LinkTo重载来实现,该重载接受一个predicate参数来决定消息是否应该沿着特定链接传递。

在问题的情况下,假设有一个TransformBlock生成整数的上游,LinkTo可用于将消息路由到不同的 BufferBlocks:

var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();

var block=new TransformBlock<Message,int>(...);
//Success if x>=5
block.LinkTo(success,x=>x>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);
Run Code Online (Sandbox Code Playgroud)

就是这样。唯一的“技巧”是谓词应该涵盖所有选项,否则消息将保留在block的输出缓冲区中。在所有其他链接之后使用default链接有助于确保没有未处理的消息。

错误处理

块不应该允许异常逃逸。有多种错误处理策略,具体取决于应用程序想要执行的操作。

句柄和日志

一种选择是处理它们并将它们记录到位,就像处理 Web 应用程序中的错误一样:

var block =new ActionBlock(msg=>{ try { wait processMessage(msg); } catch(Exception exc) { _logger.LogError(exc,....); } },options);

发布到另一个块

另一种可能性是将异常以及可能有关传入消息的信息直接发布到另一个块。该块可以记录错误和消息,或者在延迟后重试。该块后面可能有一个不同的管道,用于在将消息发送到死信缓冲区之前以增加的延迟重试消息,类似于对消息队列所做的操作:

var block =new ActionBlock<Message>(msg=>{
    try
    {
        await processMessage(msg);
    }
    catch(SomeRetriableException exc)
    {
        _retryBlock.Post(new RetryMsg(msg,exc));
    }
    catch(Exception exc)
    {
       
       _logger.LogError(exc,....);
    }
},options);
Run Code Online (Sandbox Code Playgroud)

使用的策略取决于应用程序的用途。如果 ActionBlock 用作简单的后台工作程序,则仅记录日志可能就足够了。

包裹和路线

在更高级的场景中,消息可以包装在Envelope<>携带消息和可能的任何异常的 中。路由可用于区分成功和失败消息:

class Envelope<T>
{
    public T Message{get;}
    public Exception Error {get;}
    
    public Envelope (T msg)
    {
        Message=msg;
    }
    
    public Envelope(T msg,Exception err)
    {
        Message=msg;
        Error=err;
    }
}
Run Code Online (Sandbox Code Playgroud)

该块现在返回一个 Envelope :

var block=new TransformBlock<Envelope<Message>,Envelope<int>>(env=>{
    try
    {
        var msg=env.Message;
        ....
        return new Envelope(6);
    }
    catch(Exception exc)
    {
        return new Envelope(msg,exc);
    }
});

Run Code Online (Sandbox Code Playgroud)

这允许errorBlock使用条件路由将错误路由到:

var errorBlock = ActionBlock<Envelope<Message>>(...);

var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();

//Send errors to `errorBlock`
block.LinkTo(errorBlock,env=>env.Error!=null);

//Success if x>=5
block.LinkTo(success,x=>x.Message>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);
Run Code Online (Sandbox Code Playgroud)