wil*_*dev 1 java rx-java reactivex
我有两个观测值o1和o2。我通过Observable.zip()函数压缩它们,但是每次都在不同的调度程序上进行订阅。我希望所有o1,o2和z observable都应该在Schedulers.io()上可订阅。但是每次都是Schedulers.io()或Schedulers.computation()是随机的。
这是我的重现问题的源代码
import rx.Observable;
import rx.schedulers.Schedulers;
public class RxZipSchedulers {
public static void main(String[] args) {
for(int i=0;i<100;i++) {
Observable<String> o1 = Observable.just("o1").subscribeOn(Schedulers.computation());
Observable<String> o2 = Observable.just("o2");
Observable z = Observable.zip(o1, o2, (s1, s2) -> s1 + " " + s2 + " " + Thread.currentThread());
z.subscribeOn(Schedulers.io())
.subscribe(res -> {
System.out.println(res);
});
z.toCompletable().await();
}
}
}
Run Code Online (Sandbox Code Playgroud)
在我的机器上,输出是这样的(每次都要注意RxComputationScheduler或RxIoScheduler):
o1 o2线程[RxComputationScheduler-1,5,main]
o1 o2线程[RxComputationScheduler-4,5,main]
o1 o2线程[RxComputationScheduler-1,5,main]
o1 o2线程[RxComputationScheduler-3,5,main]
o1 o2线程[RxComputationScheduler-4,5,main]
o1 o2线程[RxComputationScheduler-3,5,main]
o1 o2线程[RxIoScheduler-3,5,main]
o1 o2线程[RxIoScheduler-2,5,main]
o1 o2线程[RxComputationScheduler-1,5,main]
o1 o2线程[RxComputationScheduler-3,5,main]
o1 o2线程[RxIoScheduler-3,5,main]
o1 o2线程[RxIoScheduler-2,5,main]
为什么所有o1,o2,z可观察对象都未订阅Schedulers.io()?我认为应该将此调度程序传播到整个可观察对象链上,但这只是有时发生。
有什么情况发生zip()操作是,zip函数的线程上调用(根据Scheduler)最后发射的Observable,那就是,在这种情况下,随机o1和o2调度程序是computation()和io()和。
为什么每个is computation()和the的调度程序io()?
尽管您已在zip中指定了Scheduler Observable,但它只会影响的订阅操作zip(),而不必影响每个压缩Observable操作的位置。
在RxJava每个Observable可以指定自己的Scheduler,而当zip()将认购压缩的Observable,每次都会对配置进行操作Scheduler:
在o1情况下- computation因为它是有明确配置subscribeOn()。
但是,如果未Scheduler指定Observable-则Observable将对Scheduler已完成的订阅进行操作-在您的情况下,情况就是这样o2,因为在zip订阅被调用的地方调用了它的订阅io()。
如果您出于某种原因确实在意此zip()功能的实现,则可以简单地进行2个压缩的对象,并通过observerOn()更改操作和执行自定义zip逻辑的位置map()。
| 归档时间: |
|
| 查看次数: |
670 次 |
| 最近记录: |