在处理时停止rxJava可观察链执行

cha*_*l03 6 android rx-java rx-java2

尽管调试rxJava网络电话在一个应用程序我碰到这样一种情况,如果我们disposeclear处置对象通过链的订阅返回observables那么只有第一个observable得到处理而不是其他链接observablesflatMap.

看看下面的演示代码片段:

CompositeDisposable testCompositeDisposal = new CompositeDisposable();

private void testLoadData() {
    Disposable disposable = Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "First: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "Second: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    })).doOnNext(value -> {
        Log.w("Debug: ", "doONNext");
    }).doOnDispose(()-> {
        Log.w("Debug: ", "doOnDispose: observable has been disposed");
    }).subscribe();

    testCompositeDisposal.add(disposable);
}

@Override
public void onStop() {
    super.onStop();
    testCompositeDisposal.clear();
}
Run Code Online (Sandbox Code Playgroud)

输出:

 W/Debug:: First: 0
 W/Debug:: doOnDispose: observable has been disposed // I dispose Observable chain here.
 W/Debug:: First: 1
 W/Debug:: First: 2
 W/Debug:: First: 3
 W/Debug:: First: 4
Run Code Online (Sandbox Code Playgroud)

正如您可以在上面的日志输出中看到的那样,当我处理给定的rxJava可观察链时,只有第一个可观察的停止发出项目.

我想要阻止那些被束缚的所有可观察物.

解决这个问题的惯用方法是什么?

Tas*_*kos 6

两件事情:

  • flatMap 可以从上游预先消耗物品(在Android上最多为16);
  • 其次,更适用于您的用例,在致电之前,onNext您应该检查(是否通过.isDisposed())处置了观察者,并在发生这种情况时中止了观察。

另外,第二个 flatMap被终止(实际上它从未被调用)。在一个延续。

编辑

private void testLoadData() { 
    Disposable disposable = Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            if(sbr.isDisposed()) return;  // this will cause subscription to terminate.
            Thread.sleep(3000);
            Log.w("Debug: ", "First: " + i);
            sbr.onNext(true); 
        } 
        sbr.onComplete(); 
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> { 
        for (int i = 0; i < 5; i++) { 
            Thread.sleep(3000); 
            Log.w("Debug: ", "Second: " + i); 
            sbr.onNext(true); 
        } 
        sbr.onComplete(); 
    })).doOnNext(value -> { 
        Log.w("Debug: ", "doONNext"); 
    }).doOnDispose(()-> { 
        Log.w("Debug: ", "doOnDispose: observable has been disposed"); 
    }).subscribe(); 

    testCompositeDisposal.add(disposable); 
} 
Run Code Online (Sandbox Code Playgroud)