我有一组并行执行的可观察对象,例如localObservable和networkObservable。如果networkObservable开始发射物品(从现在开始,我只需要这些物品),则丢弃发射的物品localObservable(也许localObservable尚未开始)。
Observable<Integer> localObservable =
Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());
Run Code Online (Sandbox Code Playgroud)
您可以执行以下操作:
Observable<Long> networkObservable =
Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.share();
Observable<Long> localObservable =
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.takeUntil(networkObservable);
Observable.merge(networkObservable, localObservable)
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
这将输出:
0 // localObservable
1 // localObservable
0 // networkObservable from here on
1
2
...
Run Code Online (Sandbox Code Playgroud)
takeUntil会localObservable在第一个发射networkObservable发生时停止并取消订阅,因此只要没有开始,合并的Observable发射就开始,而合并时,它将停止从发射并切换为仅从发射。localObservablenetworkObservablelocalObservablenetworkObservable
| 归档时间: |
|
| 查看次数: |
778 次 |
| 最近记录: |