如果第二个可观察到的开始发射出rxjava开关可观察到

xym*_*lon 0 rx-java

我有一组并行执行的可观察对象,例如localObservablenetworkObservable。如果networkObservable开始发射物品(从现在开始,我只需要这些物品),则丢弃发射的物品localObservable(也许localObservable尚未开始)。

Observable<Integer> localObservable =
            Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
            Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());
Run Code Online (Sandbox Code Playgroud)

yos*_*riz 6

您可以执行以下操作:

 Observable<Long> networkObservable =
            Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
                    .subscribeOn(Schedulers.io())
                    .share();
    Observable<Long> localObservable =
            Observable.interval(500, TimeUnit.MILLISECONDS)                       
                    .subscribeOn(Schedulers.io())
                    .takeUntil(networkObservable);

    Observable.merge(networkObservable, localObservable)
            .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

这将输出:

0 // localObservable 
1 // localObservable 
0 // networkObservable from here on
1
2
...
Run Code Online (Sandbox Code Playgroud)

takeUntillocalObservable在第一个发射networkObservable发生时停止并取消订阅,因此只要没有开始,合并的Observable发射就开始,而合并时,它将停止从发射并切换为仅从发射。localObservablenetworkObservablelocalObservablenetworkObservable