Rx Java mergeDelayError无法按预期工作

Bob*_*ke4 19 java android rx-java rx-android

我在RxAndroid中使用RxJava和Android应用程序.我正在使用mergeDelayError将两个逆向拟合网络调用组合成一个observable,如果发出一个,它将处理发出的项目,如果有的话,则处理错误.这不起作用,它只会在遇到错误时触发onError操作.现在为了测试这个我转移到一个非常简单的例子,当我有一个onError调用时,仍然不会调用successAction.见下面的例子.

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
            )
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .finallyDo(completeAction)
            .subscribe(successAction, errorAction);
Run Code Online (Sandbox Code Playgroud)

只有在使用两个成功的可观察对象时才会调用成功操作.我错过了mergeDelayError应该如何工作的东西?

编辑:

我发现,如果我删除了observeOn,subscribeOn一切都按预期工作.我需要指定线程和思想,这是使用Rx的重点.知道为什么指定那些Schedulers会破坏行为吗?

niz*_*.sp 14

使用.observeOn(AndroidSchedulers.mainThread(),true)而不是.observeOn(AndroidSchedulers.mainThread()

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
        return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
    }
Run Code Online (Sandbox Code Playgroud)

以上是observeOn函数的签名.以下代码有效.

  Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
        )
                .observeOn(AndroidSchedulers.mainThread(), true)
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {

                    }
                });
Run Code Online (Sandbox Code Playgroud)

从ConcatDelayError线程得到这个技巧:https://github.com/ReactiveX/RxJava/issues/3908#issuecomment-217999009

  • 我认为这应该是公认的答案!我整天都在调试此问题。无论如何,您必须指定两次要延迟错误的时间,这非常令人困惑! (3认同)

Bob*_*ke4 5

这看起来仍然像是 mergeDelayError 运算符中的一个错误,但我可以通过为每个可观察的复制observerOn和Subscribe on来使其工作。

Observable.mergeDelayError(
            Observable.error(new RuntimeException())
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io()),
            Observable.just("Hello")
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io())
        )
        .finallyDo(completeAction)
        .subscribe(successAction, errorAction);
Run Code Online (Sandbox Code Playgroud)