Sou*_*nta 8 scala fault-tolerance akka error-kernel
我有一个非常简单的例子,我有一个Actor(SimpleActor)通过向自己发送消息来执行周期性任务.消息在actor的构造函数中调度.在正常情况下(即没有故障)一切正常.
但是如果演员必须处理错误呢?我有另一个演员(SimpleActorWithFault).这个演员可能有错.在这种情况下,我通过抛出异常来生成一个.当故障发生时(即SimpleActorWithFault抛出异常),它会自动重启.但是,这次重启会扰乱Actor内部的调度程序,该调度程序不再作为例外.如果故障发生得足够快,就会产生更多的意外行为.
我的问题是在这种情况下处理故障的首选方法是什么?我知道我可以使用Try块来处理异常.但是,如果我扩展另一个演员,我不能在超类中放置一个演员,或者某些情况下,当我是一个例外的故障发生在演员身上.
import akka.actor.{Props, ActorLogging}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor.Actor
case object MessageA
case object MessageToSelf
class SimpleActor extends Actor with ActorLogging {
//schedule a message to self every second
context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)
//keeps track of some internal state
var count: Int = 0
def receive: Receive = {
case MessageA => {
log.info("[SimpleActor] Got MessageA at %d".format(count))
}
case MessageToSelf => {
//update state and tell the world about its current state
count = count + 1
log.info("[SimpleActor] Got scheduled message at %d".format(count))
}
}
}
class SimpleActorWithFault extends Actor with ActorLogging {
//schedule a message to self every second
context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)
var count: Int = 0
def receive: Receive = {
case MessageA => {
log.info("[SimpleActorWithFault] Got MessageA at %d".format(count))
}
case MessageToSelf => {
count = count + 1
log.info("[SimpleActorWithFault] Got scheduled message at %d".format(count))
//at some point generate a fault
if (count > 5) {
log.info("[SimpleActorWithFault] Going to throw an exception now %d".format(count))
throw new Exception("Excepttttttiooooooon")
}
}
}
}
object MainApp extends App {
implicit val akkaSystem = akka.actor.ActorSystem()
//Run the Actor without any faults or exceptions
akkaSystem.actorOf(Props(classOf[SimpleActor]))
//comment the above line and uncomment the following to run the actor with faults
//akkaSystem.actorOf(Props(classOf[SimpleActorWithFault]))
}
Run Code Online (Sandbox Code Playgroud)
正确的方法是将危险行为压入其自己的行为者.这种模式称为错误内核模式(参见Akka Concurrency,第8.5节):
这种模式描述了一种非常常识的监督方法,可以根据他们可能持有的任何不稳定状态来区分参与者.
简而言之,这意味着不应允许状态为珍贵的演员失败或重启.拥有宝贵数据的任何演员都受到保护,以便任何冒险操作都会降级为奴隶演员,如果重新启动,只会导致好事发生.
错误内核模式意味着在树下进一步推动风险级别.
所以在你的情况下它会是这样的:
SimpleActor
|- ActorWithFault
Run Code Online (Sandbox Code Playgroud)
这里SimpleActor充当监督者的ActorWithFault.任何参与者的默认监督策略是重新启动子项Exception并升级其他任何内容:http:
//doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html
升级意味着演员本身可能会重新启动.由于您真的不想重新启动SimpleActor,因此可以使其始终重新启动ActorWithFault并且永远不会通过覆盖主管策略进行升级:
class SimpleActor {
override def preStart(){
// our faulty actor --- we will supervise it from now on
context.actorOf(Props[ActorWithFault], "FaultyActor")
...
override val supervisorStrategy = OneForOneStrategy () {
case _: ActorKilledException => Escalate
case _: ActorInitializationException => Escalate
case _ => Restart // keep restarting faulty actor
}
}
Run Code Online (Sandbox Code Playgroud)
To avoid messing up the scheduler:
class SimpleActor extends Actor with ActorLogging {
private var cancellable: Option[Cancellable] = None
override def preStart() = {
//schedule a message to self every second
cancellable = Option(context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf))
}
override def postStop() = {
cancellable.foreach(_.cancel())
cancellable = None
}
...
Run Code Online (Sandbox Code Playgroud)
To correctly handle exceptions (akka.actor.Status.Failure is for correct answer to an ask in case of Ask pattern usage by sender):
...
def receive: Receive = {
case MessageA => {
try {
log.info("[SimpleActor] Got MessageA at %d".format(count))
} catch {
case e: Exception =>
sender ! akka.actor.Status.Failure(e)
log.error(e.getMessage, e)
}
}
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6067 次 |
| 最近记录: |