Custom Supervision.Decider不捕获ActorPublisher产生的异常

exp*_*ert 6 scala akka akka-stream

我正在构建一个将由第三方使用的库.在我的一个方法中,我返回的Stream[Item]是从分页REST API调用的结果异步生成的.

我正在使用我对BulkPullerAsync的修改.我的代码在这里.

我希望我的流的收件人能够处理错误.根据文档,我应该使用自定义Supervision.Decider.

val decider: Supervision.Decider = {
  case ex =>
    ex.printStackTrace()
    Supervision.Stop
}

implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
Run Code Online (Sandbox Code Playgroud)

不幸的是,它没有捕获我的ActionPublisher中抛出的异常.我看到它被处理,ActorPublisher.onError被叫,但它没有到达Supervision.Decider.它适用于文档中提供的简单Stream.

如果我使用错误也不会达到演员Sink.actorRef.

我该怎么办 ?我希望我的用户Stream不应该依赖于其实现的性质.

UPD:为了实验,我尝试了以下样本

val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))
Run Code Online (Sandbox Code Playgroud)

在这种情况下,异常被捕获Decider.

UPD2:我尝试通过Publisher从不同类型的源生成然后将它们转换回Source期望所有错误来实现它来欺骗它Subscriber.onError:)显然混合ActorPublisher + Decider有问题...

总的来说,我认为这是不一致的行为.我无法使用一种机制来处理错误Stream.