请考虑以下示例:
Observable.range(1, 10).subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
这将输出1到5之间的数字,然后打印异常.
我想要实现的是让观察者保持订阅并在抛出异常后继续运行,即打印从1到10的所有数字.
我已经尝试过使用retry()和其他各种错误处理操作符,但是,如文档中所述,它们的目的是处理observable本身发出的错误.
最简单的解决方案就是将整个主体包装onNext成一个try-catch块,但这对我来说听起来不是一个好方法.在类似的Rx.NET问题中,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装.我试图重拍它:
Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));
proxy.subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
这不会改变任何东西,因为RxJava本身将订阅者包装成一个SafeSubscriber.使用unsafeSubscribe它来解决它似乎也不是一个好的解决方案.
我该怎么做才能解决这个问题?
我有一个场景,我有多个IObservable序列,我想结合,Merge然后听.但是,如果其中一个产生错误,我不希望它崩溃其他流的所有内容,以及重新订阅序列(这是一个'永久'序列).
我这样做是通过Retry()在合并之前向流附加一个,即:
IEnumerable<IObservable<int>> observables = GetObservables();
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(/* Do subscription stuff */);
Run Code Online (Sandbox Code Playgroud)
但是,当我想测试时会出现问题.我想测试的是,如果其中一个IObservables observables产生一个OnError,其他的应该仍然能够发送它们的值,它们应该被处理
我以为我只用两个Subject<int>s代表两个IObservables observables; 发送一个OnError(new Exception())和另一个,然后发送OnNext(1).但是,它似乎Subject<int>将重播新订阅的所有先前值(实际上Retry()是这样),将测试变为无限循环.
我尝试通过创建一个手册来解决它,该手册IObservable在第一个订阅上产生错误,然后是一个空序列,但它感觉很hacky:
var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
i++;
if (i < nErrors) {
return Observable.Throw<int>(new Exception()).Subscribe(o);
} else {
return Observable.Empty<int>().Subscribe(o);
}
}); …Run Code Online (Sandbox Code Playgroud)