为什么Akka Streams吞噬了我的异常?

Mat*_*ger 18 scala exception-handling akka-stream

为什么是例外

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

object TestExceptionHandling {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val materializer = ActorMaterializer()(defaultActorSystem)

    Source(List(1, 2, 3)).map { i =>
      if (i == 2) {
        throw new RuntimeException("Please, don't swallow me!")
      } else {
        i
      }
    }.runForeach { i =>
      println(s"Received $i")
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

默默地忽略了?我可以看到流在打印后停止Received 1,但没有记录任何内容.请注意,问题通常不是日志记录配置,因为如果我akka.log-config-on-start = onapplication.conf文件中设置,我会看到很多输出.

Mat*_*ger 15

我现在正在使用一个自定义Supervision.Decider,确保正确记录异常,可以这样设置:

val decider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream", e)
  Supervision.Stop
}

implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
Run Code Online (Sandbox Code Playgroud)

另外,正如Vikor Klang所指出的那样,在上面给出的例子中,异常也可能被"抓住"了

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.runForeach { i =>
  println(s"Received $i")
}.onComplete {
  case Success(_) =>
    println("Done")
  case Failure(e) =>
    println(s"Failed with $e")
}
Run Code Online (Sandbox Code Playgroud)

但请注意,这种方法对您没有帮助

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.to(Sink.foreach { i =>
  println(s"Received $i")
}).run()
Run Code Online (Sandbox Code Playgroud)

自从run()回归Unit.

  • `run()`只返回`Unit`,因为它默认保留"左"侧(Keep.left)的物化值.如果你曾经使用过:toMat(Sink.foreach(...))(Keep.right)那么它会再次起作用. (2认同)

exp*_*ert 5

当我开始使用akk-streams时,我遇到了类似的问题.Supervision.Decider帮助但并不总是.

不幸的是,它没有捕获异常ActionPublisher.我看到它被处理,ActorPublisher.onError被叫,但它没有到达Supervision.Decider.它适用于文档中提供的简单Stream.

如果我使用错误也不会达到演员Sink.actorRef.

为了实验,我尝试了以下样本

val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))
Run Code Online (Sandbox Code Playgroud)

在这种情况下,Decider捕获了异常,但从未到达actor订阅者.

总的来说,我认为这是不一致的行为.我不能使用一种机制来处理Stream中的错误.

我原来的SO问题:Custom Supervision.Decider没有捕获ActorPublisher产生的异常

这里是跟踪的akka​​问题:https://github.com/akka/akka/issues/18359