exp*_*ert 6 scala akka akka-stream
我正在构建一个将由第三方使用的库.在我的一个方法中,我返回的Stream[Item]是从分页REST API调用的结果异步生成的.
我正在使用我对BulkPullerAsync的修改.我的代码在这里.
我希望我的流的收件人能够处理错误.根据文档,我应该使用自定义Supervision.Decider.
val decider: Supervision.Decider = {
case ex =>
ex.printStackTrace()
Supervision.Stop
}
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
Run Code Online (Sandbox Code Playgroud)
不幸的是,它没有捕获我的ActionPublisher中抛出的异常.我看到它被处理,ActorPublisher.onError被叫,但它没有到达Supervision.Decider.它适用于文档中提供的简单Stream.
如果我使用错误也不会达到演员Sink.actorRef.
我该怎么办 ?我希望我的用户Stream不应该依赖于其实现的性质.
UPD:为了实验,我尝试了以下样本
val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))
Run Code Online (Sandbox Code Playgroud)
在这种情况下,异常被捕获Decider.
UPD2:我尝试通过Publisher从不同类型的源生成然后将它们转换回Source期望所有错误来实现它来欺骗它Subscriber.onError:)显然混合ActorPublisher + Decider有问题...
总的来说,我认为这是不一致的行为.我无法使用一种机制来处理错误Stream.
| 归档时间: |
|
| 查看次数: |
998 次 |
| 最近记录: |