请考虑以下示例:
Observable.range(1, 10).subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
这将输出1到5之间的数字,然后打印异常.
我想要实现的是让观察者保持订阅并在抛出异常后继续运行,即打印从1到10的所有数字.
我已经尝试过使用retry()和其他各种错误处理操作符,但是,如文档中所述,它们的目的是处理observable本身发出的错误.
最简单的解决方案就是将整个主体包装onNext成一个try-catch块,但这对我来说听起来不是一个好方法.在类似的Rx.NET问题中,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装.我试图重拍它:
Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));
proxy.subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
这不会改变任何东西,因为RxJava本身将订阅者包装成一个SafeSubscriber.使用unsafeSubscribe它来解决它似乎也不是一个好的解决方案.
我该怎么做才能解决这个问题?