如果第二个立即失败,ReactiveX concat不会从第一个observable产生onNext

Ros*_*hak 13 rx-java

我连续两个observable来显示缓存中的数据,然后开始从网络加载数据并显示更新的数据.

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler),
    getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
 .subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)

如果没有网络连接,则在调用OnSubscribe后第二个observable会立即失败.

如果第二个observable立即失败,则第一个observable的数据将丢失.永远不会在订阅者中调用onNext方法.

我想,这可能是由于OperatorConcat.ConcatSubscriber中的以下代码造成的

    @Override
    public void onNext(Observable<? extends T> t) {
        queue.add(nl.next(t));
        if (WIP_UPDATER.getAndIncrement(this) == 0) {
            subscribeNext();
        }
    }

    @Override
    public void onError(Throwable e) {
        child.onError(e);
        unsubscribe();
    }
Run Code Online (Sandbox Code Playgroud)

看起来在收到错误后它取消订阅,并且所有挂起的onNext都将丢失.

解决问题的最佳方法是什么?

更新

看起来我找到了解决方案,而不是为连接的observable设置observOn我为每个observable设置了observOn.

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
    getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
 .subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)

nik*_*kis 7

默认行为observeOnonError事件可以跳到队列前面,这里是文档的引用:

请注意,如果真正异步,onError通知将onNext在发射线程上的通知之前切断Scheduler.

这是一个很小的测试来说明事情:

Scheduler newThreadScheduler = Schedulers.newThread();

Observable<Integer> stream = Observable.create(integerEmitter -> {
    integerEmitter.onNext(1);
    integerEmitter.onNext(2);
    integerEmitter.onNext(3);
    integerEmitter.onNext(4);
    integerEmitter.onNext(5);
    integerEmitter.onError(new RuntimeException());
}, Emitter.BackpressureMode.NONE);

TestSubscriber<Integer> subscriber = new TestSubscriber<>();
stream.subscribeOn(Schedulers.computation())
        .observeOn(newThreadScheduler).subscribe(subscriber);

subscriber.awaitTerminalEvent();

subscriber.assertValues(1, 2, 3, 4, 5);
subscriber.assertError(RuntimeException.class);
Run Code Online (Sandbox Code Playgroud)

通常消费者会期望以下顺序:1> 2> 3> 4> 5> Error.但是使用just observeOn可能会导致错误,测试将失败.

这个行为很久以前在这里实现了https://github.com/ReactiveX/RxJava/issues/1680,检查为什么这样做的动机.为了避免这种行为,人们可以使用重载的observeOn使用delayError参数:

指示onError通知是否可以onNext 在调度边界的另一侧的通知之前切断.如果true结束的序列onError将以与从上游接收的顺序重放

这是您通常所期望的,因此更改observeOn(newThreadScheduler)observeOn(newThreadScheduler, true)将修复测试.

然后是@Neil的问题:为什么@Rostyslav提出的解决方案有效?它正在工作,因为最终序列没有线程切换.

在提出的解决方案中,最终序列是从同一线程上的两个序列制作的:第一个序列是来自缓存的数据,第二个序列只是来自网络的错误.它们是在同一个线程上一起制作的,并且在没有线程切换之后 - 用户观察到了AndroidSchedulers.mainThread().如果你试图将final更改Scheduler为其他的,它将再次失败.