ak0*_*817 1 scala actor akka akka-supervision akka-actor
我有一个在应用程序启动时作为另一个 Actor 的子级创建的 Actor,每天从父级接收一条消息,以执行从某个 SFTP 服务器获取某些文件的操作。
现在,可能存在一些轻微的临时连接异常,导致操作失败。在这种情况下,需要重试。
但可能存在抛出异常并且重试时无法解决的情况(例如:找不到文件、某些配置不正确等)
因此,在这种情况下,考虑到参与者将在很长的时间间隔(每天一次)后收到消息,适当的重试机制和监督策略可能是什么。
在这种情况下,发送给参与者的消息并不是错误的输入——它只是一个触发器。例子:
case object FileFetch
Run Code Online (Sandbox Code Playgroud)
如果我在父级中有这样的监督策略,它将在每个次要/主要异常时重新启动失败的子级,而无需重试。
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
case _: Exception => Restart
}
Run Code Online (Sandbox Code Playgroud)
我想要的是这样的:
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
case _: MinorException => Retry same message 2, 3 times and then Restart
case _: Exception => Restart
}
Run Code Online (Sandbox Code Playgroud)
在发生异常时“重试”或重新发送消息是您必须自己实现的事情。从文档中:
\n\n\n\n\n如果在处理一条消息时(即从其邮箱中取出并移交给当前行为)抛出异常,那么这条消息将会丢失。重要的是要了解它不会放回邮箱。因此,如果您想重试处理消息,您需要通过捕获异常并重试流程来自行处理。确保对重试次数进行限制,因为您不希望系统处于活锁状态(因此会消耗大量 CPU 周期而不取得进展)。
\n
如果您想FileFetch在不重新启动子进程的情况下重新向子进程发送消息MinorException,那么您可以捕获子进程中的异常以避免触发监督策略。在 try-catch 块中,您可以向父级发送一条消息,并让父级跟踪重试次数(例如,如果您希望父级制定某种退避策略,则可能在此消息中包含时间戳) 。在孩子身上:
def receive = {\n case FileFetch =>\n try {\n ...\n } catch {\n case m: MinorException =>\n val now = System.nanoTime\n context.parent ! MinorIncident(self, now)\n }\n case ...\n} \nRun Code Online (Sandbox Code Playgroud)\n\n在父级中:
\n\noverride val supervisorStrategy =\n OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {\n case _: Exception => Restart\n }\n\nvar numFetchRetries = 0\n\ndef receive = {\n case MinorIncident(fetcherRef, time) =>\n log.error(s"${fetcherRef} threw a MinorException at ${time}")\n if (numFetchRetries < 3) { // possibly use the time in the retry logic; e.g., a backoff\n numFetchRetries = numFetchRetries + 1\n fetcherRef ! FileFetch\n } else {\n numFetchRetries = 0\n context.stop(fetcherRef)\n ... // recreate the child\n }\n case SomeMsgFromChildThatFetchSucceeded =>\n numFetchRetries = 0\n case ...\n}\nRun Code Online (Sandbox Code Playgroud)\n\nResume或者,您可以在发生 a 时将主管策略设置为子级,而不是捕获子级中的异常MinorException,同时仍然让父级处理消息重试逻辑:
override val supervisorStrategy =\n OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {\n case m: MinorException =>\n val child = sender()\n val now = System.nanoTime\n self ! MinorIncident(child, now)\n Resume\n case _: Exception => Restart\n }\nRun Code Online (Sandbox Code Playgroud)\n