RxJava doOnError和onErrorReturn如何工作?

Nil*_*zor 20 java system.reactive rx-java

我做了这些单元测试,结果不是我预期的结果:

// This one outputs "subscribe.onError" 
@Test
public void observable_doOnError_subscribingToError() throws InterruptedException {
    Observable<String> obs = getErrorProducingObservable();
    obs.doOnError(throwable -> System.out.println("doOnError"));
    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> {},
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

// This one outputs "subscribe.onError" 
@Test
public void observable_onErrorReturn() throws InterruptedException {
    Observable<String> obs = getErrorProducingObservable();
    obs.onErrorReturn(throwable -> "Yeah I got this");
    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> System.out.println("got: " + s),
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

private Observable<String> getErrorProducingObservable()  {
    return Observable.create(subscriber -> {
        subscriber.onError(new RuntimeException("Somebody set up us the bomb"));
    });
}
Run Code Online (Sandbox Code Playgroud)

所以两者都输出"subscribe.onError" - 似乎既没有doOnError也没有onErrorReturn被调用.

doOnError记录为:

修改源Observable,以便在调用onError时调用它.

我不确定如何解释它,但我希望输出"doOnError"或"doOnError"后跟"subscribe.onError".

onErrorReturn 记录为:

指示Observable发出一个项目(由指定函数返回),而不是在遇到错误时调用onError.

因此我期待"得到:是的,我得到了这个"作为后一个测试的输出.

是什么赋予了?

mar*_*ran 16

双方doOnErroronErrorReturn返回一个新的Observable与行为改变.我同意他们的文件可能有点误导.像这样修改你的测试以获得预期的行为:

// This one outputs "subscribe.onError" 
@Test
public void observable_doOnError_subscribingToError() throws InterruptedException {
    Observable<String> obs = 
        getErrorProducingObservable()
            .doOnError(throwable -> System.out.println("doOnError"));

    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> {},
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

// This one outputs "subscribe.onError" 
@Test
public void observable_onErrorReturn() throws InterruptedException {
    Observable<String> obs = 
        getErrorProducingObservable()
            .onErrorReturn(throwable -> "Yeah I got this");

    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> System.out.println("got: " + s),
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

private Observable<String> getErrorProducingObservable()  {
    return Observable.create(subscriber -> {
        subscriber.onError(new RuntimeException("Somebody set up us the bomb"));
    });
}
Run Code Online (Sandbox Code Playgroud)

  • 啊,谢谢.现在它按预期工作,我了解到`doOnError`不会改变订阅者的行为 - 你只需要添加第二个错误监听器. (2认同)