当异步处理程序出现异常时,Akka.Net PreRestart 未执行

lbr*_*him 1 c# task akka akka.net

我有以下演员,我正在尝试重新启动并将失败的消息重新发送回演员:

public class BuildActor : ReceivePersistentActor
{
    public override string PersistenceId => "asdad3333";

    private readonly IActorRef _nextActorRef;

    public BuildActor(IActorRef nextActorRef)
    {
        _nextActorRef = nextActorRef;

        Command<Workload>(x => Build(x));

        RecoverAny(workload =>
        {
            Console.WriteLine("Recovering");
        });
    }

    public void Build(Workload Workload)
    {
        var context = Context;
        var self = Self;

        Persist(Workload, async x =>
        {
            //after this line executes
            //application goes into break mode
            //does not execute PreStart or Recover
            var workload = await BuildTask(Workload);

            _nextActorRef.Tell(workload);

            context.Stop(self);
        });
    }

    private Task<Workload> BuildTask(Workload Workload)
    {
        //works as expected if method made synchronous
        return Task.Run(() =>
        {
            //simulate exception
            if (Workload.ShowException)
            {
                throw new Exception();
            }

            return Workload;
        });
    }

    protected override void PreRestart(Exception reason, object message)
    {
        if (message is Workload workload)
        {
            Console.WriteLine("Prestart");

            workload.ShowException = false;

            Self.Tell(message);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

在成功处理程序中,Persist我尝试模拟抛出异常,但在异常时应用程序进入中断模式并且PreRestart不会调用钩子。但是,如果我BuildTask通过删除来使方法同步Task.Run,则​​在异常时都会调用PreRestart和方法。Recover<T>

如果有人能指出我应该推荐什么模式以及我哪里出错了,我将非常感激。

Bar*_*ski 5

最有可能的是,Akka.Persistence 并不是解决您的问题的好方法。

Akka.Persistence 使用事件源原则来存储 Actor 的状态。在这方面有几个重要的关键点:

  • 您发送给演员的是一条命令。它描述了一项你想要完成的工作。执行该命令可能会导致进行一些实际处理,并最终可能导致以事件的形式持久保存参与者的线性状态更改历史
  • 在 Akka.NET 中,Persist方法仅用于存储事件- 它们描述了发生了某些事情的事实:因此,它们不能被拒绝,也不能失败(您在Persist回调中所做的事情)。
  • 当一个 Actor 在任何时间点重新启动时,它总是会尝试通过重播Persist直到最后一个已知时间点的所有事件来重新创建自己的状态。因此,重要的是该Recover方法应该只专注于重放参与者的状态(可以在同一事件上多次调用它)并且永远不会导致副作用(副作用的示例是发送电子邮件)。那里抛出的任何异常都意味着该 Actor 状态已不可恢复地损坏并且该 Actor 将被杀死。

如果您想将消息重新发送给您的演员,您可以:

  1. 将可靠的消息队列(即 RabbitMQ 或 Azure 服务总线)或日志(Kafka 或事件中心)放在参与者处理管道前面。这实际上是很多情况下最合理的场景。
  2. 使用Akka.Persistence 中的至少一次传递语义 - 但恕我直言,只有当由于某种原因你不能使用第一个解决方案时。
  3. 最简单且不可靠的选项(因为消息仅驻留在内存中并且从不持久化)是死信队列。每条未处理的消息都会发送到那里。您可以订阅它并过滤传入的数据,以检测哪些消息应再次发送给收件人。