相关疑难解决方法(0)

如何处理RxJava中观察者的onNext引发的异常?

请考虑以下示例:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

这将输出1到5之间的数字,然后打印异常.

我想要实现的是让观察者保持订阅并在抛出异常后继续运行,即打印从1到10的所有数字.

我已经尝试过使用retry()其他各种错误处理操作符,但是,如文档中所述,它们的目的是处理observable本身发出的错误.

最简单的解决方案就是将整个主体包装onNext成一个try-catch块,但这对我来说听起来不是一个好方法.在类似的Rx.NET问题中,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装.我试图重拍它:

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

这不会改变任何东西,因为RxJava本身将订阅者包装成一个SafeSubscriber.使用unsafeSubscribe它来解决它似乎也不是一个好的解决方案.

我该怎么做才能解决这个问题?

java reactive-programming system.reactive rx-java

27
推荐指数
1
解决办法
6656
查看次数

非重播热观察

原始问题

我有一个场景,我有多个IObservable序列,我想结合,Merge然后听.但是,如果其中一个产生错误,我不希望它崩溃其他流的所有内容,以及重新订阅序列(这是一个'永久'序列).

我这样做是通过Retry()在合并之前向流附加一个,即:

IEnumerable<IObservable<int>> observables = GetObservables();

observables
    .Select(o => o.Retry())
    .Merge()
    .Subscribe(/* Do subscription stuff */);
Run Code Online (Sandbox Code Playgroud)

但是,当我想测试时会出现问题.我想测试的是,如果其中一个IObservables observables产生一个OnError,其他的应该仍然能够发送它们的值,它们应该被处理

我以为我只用两个Subject<int>s代表两个IObservables observables; 发送一个OnError(new Exception())和另一个,然后发送OnNext(1).但是,它似乎Subject<int>将重播新订阅的所有先前值(实际上Retry()是这样),将测试变为无限循环.

我尝试通过创建一个手册来解决它,该手册IObservable在第一个订阅上产生错误,然后是一个空序列,但它感觉很hacky:

var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
    i++;
    if (i < nErrors) {
        return Observable.Throw<int>(new Exception()).Subscribe(o);
    } else {
        return Observable.Empty<int>().Subscribe(o);
    }
}); …
Run Code Online (Sandbox Code Playgroud)

.net c# subject system.reactive

5
推荐指数
1
解决办法
1317
查看次数