重试长寿 akka actor 的小异常

ak0*_*817 1 scala actor akka akka-supervision akka-actor

我有一个在应用程序启动时作为另一个 Actor 的子级创建的 Actor,每天从父级接收一条消息,以执行从某个 SFTP 服务器获取某些文件的操作。

现在,可能存在一些轻微的临时连接异常,导致操作失败。在这种情况下,需要重试。

但可能存在抛出异常并且重试时无法解决的情况(例如:找不到文件、某些配置不正确等)

因此,在这种情况下,考虑到参与者将在很长的时间间隔(每天一次)后收到消息,适当的重试机制和监督策略可能是什么。

在这种情况下,发送给参与者的消息并不是错误的输入——它只是一个触发器。例子:

case object FileFetch
Run Code Online (Sandbox Code Playgroud)

如果我在父级中有这样的监督策略,它将在每个次要/主要异常时重新启动失败的子级,而无需重试。

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: Exception                => Restart
}
Run Code Online (Sandbox Code Playgroud)

我想要的是这样的:

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: MinorException           => Retry same message 2, 3 times and then Restart
    case _: Exception                => Restart
}
Run Code Online (Sandbox Code Playgroud)

Jef*_*ung 5

在发生异常时“重试”或重新发送消息是您必须自己实现的事情。从文档中:

\n\n
\n

如果在处理一条消息时(即从其邮箱中取出并移交给当前行为)抛出异常,那么这条消息将会丢失。重要的是要了解它不会放回邮箱。因此,如果您想重试处理消息,您需要通过捕获异常并重试流程来自行处理。确保对重试次数进行限制,因为您不希望系统处于活锁状态(因此会消耗大量 CPU 周期而不取得进展)。

\n
\n\n

如果您想FileFetch在不重新启动子进程的情况下重新向子进程发送消息MinorException,那么您可以捕获子进程中的异常以避免触发监督策略。在 try-catch 块中,您可以向父级发送一条消息,并让父级跟踪重试次数(例如,如果您希望父级制定某种退避策略,则可能在此消息中包含时间戳) 。在孩子身上:

\n\n
def receive = {\n  case FileFetch =>\n    try {\n      ...\n    } catch {\n      case m: MinorException =>\n        val now = System.nanoTime\n        context.parent ! MinorIncident(self, now)\n    }\n  case ...\n} \n
Run Code Online (Sandbox Code Playgroud)\n\n

在父级中:

\n\n
override val supervisorStrategy =\n  OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {\n    case _: Exception => Restart\n  }\n\nvar numFetchRetries = 0\n\ndef receive = {\n  case MinorIncident(fetcherRef, time) =>\n    log.error(s"${fetcherRef} threw a MinorException at ${time}")\n    if (numFetchRetries < 3) { // possibly use the time in the retry logic; e.g., a backoff\n      numFetchRetries = numFetchRetries + 1\n      fetcherRef ! FileFetch\n    } else {\n      numFetchRetries = 0\n      context.stop(fetcherRef)\n      ... // recreate the child\n    }\n  case SomeMsgFromChildThatFetchSucceeded =>\n    numFetchRetries = 0\n  case ...\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

Resume或者,您可以在发生 a 时将主管策略设置为子级,而不是捕获子级中的异常MinorException,同时仍然让父级处理消息重试逻辑:

\n\n
override val supervisorStrategy =\n  OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {\n    case m: MinorException =>\n      val child = sender()\n      val now = System.nanoTime\n      self ! MinorIncident(child, now)\n      Resume\n    case _: Exception => Restart\n  }\n
Run Code Online (Sandbox Code Playgroud)\n