如果抛出异常,Akka Actor不会终止

fre*_*oma 73 routing scala fault-tolerance actor akka

我目前正试图开始使用Akka,我面临一个奇怪的问题.我的演员有以下代码:

class AkkaWorkerFT extends Actor {
  def receive = {
    case Work(n, c) if n < 0 => throw new Exception("Negative number")
    case Work(n, c) => self reply n.isProbablePrime(c);
  }
}
Run Code Online (Sandbox Code Playgroud)

这就是我开始工作的方式:

val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()
Run Code Online (Sandbox Code Playgroud)

这就是我关闭所有东西的方式:

futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill
Run Code Online (Sandbox Code Playgroud)

现在发生的事情是,如果我发送工具消息,其中n> 0(没有抛出异常),一切正常,应用程序正常关闭.但是,只要我发送一条导致异常的消息,应用程序就不会终止,因为仍有一个actor正在运行,但我无法弄清楚它来自哪里.

如果它有帮助,这是有问题的线程的堆栈:

  Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) 
    Unsafe.park(boolean, long) line: not available [native method]  
    LockSupport.park(Object) line: 158  
    AbstractQueuedSynchronizer$ConditionObject.await() line: 1987   
    LinkedBlockingQueue<E>.take() line: 399 
    ThreadPoolExecutor.getTask() line: 947  
    ThreadPoolExecutor$Worker.run() line: 907   
    MonitorableThread(Thread).run() line: 680   
    MonitorableThread.run() line: 182   
Run Code Online (Sandbox Code Playgroud)

PS:没有终止的线程不是任何工作线程,因为我添加了一个postStop回调,每一个都正确停止.

PPS:Actors.registry.shutdownAll解决问题的方法,但我认为shutdownAll应该只作为最后的手段使用,不应该吗?

Arn*_*-Oz 21

处理akka actor内部问题的正确方法不是抛出异常,而是设置supervisor层次结构

"在并发代码中抛出异常(让我们假设我们使用非链接的actor),只会简单地炸掉当前执行actor的线程.

没有办法发现事情出错了(除了检查堆栈跟踪).你无能为力."

通过Supervisor层次结构查看容错(1.2)

*注意*以上适用于旧版本的Akka(1.2)在较新版本(例如2.2)中,您仍然设置了一个主管层次结构,但它会捕获子进程抛出的异常.例如

class Child extends Actor {
    var state = 0
    def receive = {
      case ex: Exception ? throw ex
      case x: Int        ? state = x
      case "get"         ? sender ! state
    }
  }
Run Code Online (Sandbox Code Playgroud)

在主管:

class Supervisor extends Actor {
    import akka.actor.OneForOneStrategy
    import akka.actor.SupervisorStrategy._
    import scala.concurrent.duration._

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
        case _: ArithmeticException      ? Resume
        case _: NullPointerException     ? Restart
        case _: IllegalArgumentException ? Stop
        case _: Exception                ? Escalate
      }

    def receive = {
      case p: Props ? sender ! context.actorOf(p)
    }
  }
Run Code Online (Sandbox Code Playgroud)

通过Supervisor层次结构查看容错(2.2)