izs*_*tas 27 java reactive-programming system.reactive rx-java
请考虑以下示例:
Observable.range(1, 10).subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
这将输出1到5之间的数字,然后打印异常.
我想要实现的是让观察者保持订阅并在抛出异常后继续运行,即打印从1到10的所有数字.
我已经尝试过使用retry()和其他各种错误处理操作符,但是,如文档中所述,它们的目的是处理observable本身发出的错误.
最简单的解决方案就是将整个主体包装onNext成一个try-catch块,但这对我来说听起来不是一个好方法.在类似的Rx.NET问题中,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装.我试图重拍它:
Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));
proxy.subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
这不会改变任何东西,因为RxJava本身将订阅者包装成一个SafeSubscriber.使用unsafeSubscribe它来解决它似乎也不是一个好的解决方案.
我该怎么做才能解决这个问题?
Jam*_*rld 17
这是学习Rx时出现的常见问题.
您将异常处理逻辑放在订阅者中的建议优于创建通用可观察包装器.
请记住,Rx是关于将事件推送给订阅者.
从可观察的界面可以清楚地看出,除了处理事件所花费的时间或任何抛出异常中包含的信息之外,没有任何观察者可以知道它的订阅者.
处理订户异常并继续向该订户发送事件的通用包装器是一个坏主意.
为什么?那么observable应该只知道订户现在处于未知的故障状态.在这种情况下继续发送事件是不明智的 - 例如,订阅者处于这样一种情况:从这一点开始的每个事件都将抛出异常并花费一些时间来完成它.
一旦订阅者抛出异常,对于observable只有两个可行的操作过程:
对订户异常的具体处理将是一个糟糕的设计选择; 它会在订阅者和可观察者之间产生不适当的行为耦合.因此,如果您想要对不良订阅者具有弹性,那么上述两种选择实际上是可观察者本身的合理责任限制.
如果您希望您的订户具有弹性并继续运行,那么您应该将其包装在异常处理逻辑中,该逻辑旨在处理您知道如何从中恢复的特定异常(并且可能处理瞬态异常,记录,重试逻辑,断路等) ).
只有订户本身才具有上下文,以了解在面对失败时是否适合接收更多事件.
如果您的情况需要开发可重复使用的错误处理逻辑,请将自己置于包装观察者事件处理程序而不是可观察事件的思维模式中- 并且注意不要盲目地在失败时继续传输事件.发布吧!虽然没有关于Rx的文章,但是有趣的软件工程经典在这最后一点上有很多话要说.如果你还没看过,我强烈建议.