处理Akka演员异常的最佳实践

max*_*dim 9 java akka

我有以下任务,我的Java/Executors解决方案运行良好,但我想在Akka中实现相同的功能并寻找最佳实践建议.

问题:

从多个URL并行获取/解析数据,阻塞直到获取所有数据并返回聚合结果.应该重试错误(IOException等)达到一定次数.

到目前为止,我的实现非常简单 - 创建Fetcher actor,它知道应该获取哪些URL,它创建一堆Worker actor并发送它们URL,每个消息一个.完成特定的URL Worker后,将结果发送回Fetcher.Fetcher保持结果状态,工人无国籍.以下简化代码.

提取程序:

class Fetcher extends UntypedActor {
  private ActorRef worker;

  public void onReceive(Object message) throws Exception {
    if (message instanceof FetchMessage) {
      this.worker = context().actorOf(SpringExtension.SpringExtProvider.get(actorSystem).props("Worker")
              .withRouter(new RoundRobinPool(4)), "worker");
      for(URL u: urls) {
        this.worker.tell(new WorkUnit(u), getSelf());
      }
   }
   else if (message instanceof Result) {
     // accumulate results
   }
}
Run Code Online (Sandbox Code Playgroud)

工人:

class Worker extends UntypedActor {

  public void onReceive(Object message) throws Exception {
    if (message instanceof WorkUnit) {
      // fetch URL, parse etc
      // send result back to sender
      getSender().tell(new Result(...), null);
    }
}
Run Code Online (Sandbox Code Playgroud)

到目前为止,如此好,没有例外,一切都按预期工作.

但是如果在Worker中获取URL时发出IOException,那么Akka会重新启动Worker actor,但是当时Worker正在处理的消息将丢失.即使我使用不同的SupervisorStrategy,结果也是一样的 - 有些消息实际上"丢失"了.当然我可以使用try/catch在Worker.onReceive()中包装代码,但我觉得这违背了Akka哲学.我想我可以使用持久性消息传递,但我不认为在这种情况下消息持久性的复杂性是合理的.

我可能需要某种方式让Fetcher弄清楚工人是否未能获取一些URL并再次重新发送WorkUnit或检测到某些结果没有回来太长时间.处理这种情况的最佳方法是什么?

谢谢,

max*_*dim 0

由于还没有人回答这个问题,这就是我到目前为止发现的。在我看来,对于我的情况,带有显式确认的邮箱是最合适的。下面是修改后的代码的样子。

首先,在类路径中的 pee-dispatcher.conf 文件中定义 peek-dispatcher 和 rssWorker 的部署:

peek-dispatcher {
  mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
  max-retries = 10
}

akka.actor.deployment {
  /rssFetcher/rssWorker {
    dispatcher = peek-dispatcher
    router = round-robin
    nr-of-instances = 4
  }
}
Run Code Online (Sandbox Code Playgroud)

使用上面的配置创建 ActorSystem:

ActorSystem system = ActorSystem.create("Akka", ConfigFactory.load("peek-dispatcher.conf"));
Run Code Online (Sandbox Code Playgroud)

Fetcher 几乎保持原样,只有当我们在配置文件中定义路由器时,才能简化 Worker Actor 的创建

this.worker = getContext().actorOf(SpringExtension.SpringExtProvider.get(actorSystem).props("worker"), "worker");
Run Code Online (Sandbox Code Playgroud)

另一方面,工作人员会在处理的最后添加额外的行来确认消息。如果出现任何错误,消息将不会得到确认,并将保留在收件箱中,以便再次重新投递最多“最大重试”次数,如配置中指定:

class Worker extends UntypedActor {

  public void onReceive(Object message) throws Exception {
    if (message instanceof WorkUnit) {
      // fetch URL, parse etc
      // send result back to sender
      getSender().tell(new Result(...), null);
      // acknowledge message
      PeekMailboxExtension.lookup().ack(getContext());
    }
}
Run Code Online (Sandbox Code Playgroud)

注意:我不确定 PeekMailboxExtension.lookup().ack(getContext()); 是调用确认的正确方法,但它似乎有效

这也可能与 Workers 的 SupervisorStrategy.resume() 结合使用 - 由于 Worker 没有状态,它可以在错误后恢复消息的消耗,我认为不需要重新启动 Worker。