Observable.zip选择不同的调度程序来订阅不同的呼叫

wil*_*dev 1 java rx-java reactivex

我有两个观测值o1和o2。我通过Observable.zip()函数压缩它们,但是每次都在不同的调度程序上进行订阅。我希望所有o1,o2和z observable都应该在Schedulers.io()上可订阅。但是每次都是Schedulers.io()或Schedulers.computation()是随机的。

这是我的重现问题的源代码

import rx.Observable;
import rx.schedulers.Schedulers;

public class RxZipSchedulers {

    public static void main(String[] args) {

        for(int i=0;i<100;i++) {
            Observable<String> o1 = Observable.just("o1").subscribeOn(Schedulers.computation());
            Observable<String> o2 = Observable.just("o2");

            Observable z = Observable.zip(o1, o2, (s1, s2) -> s1 + " " + s2 + " " + Thread.currentThread());

            z.subscribeOn(Schedulers.io())
                    .subscribe(res -> {
                        System.out.println(res);
                    });

            z.toCompletable().await();
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

在我的机器上,输出是这样的(每次都要注意RxComputationScheduler或RxIoScheduler):

o1 o2线程[RxComputationScheduler-1,5,main]

o1 o2线程[RxComputationScheduler-4,5,main]

o1 o2线程[RxComputationScheduler-1,5,main]

o1 o2线程[RxComputationScheduler-3,5,main]

o1 o2线程[RxComputationScheduler-4,5,main]

o1 o2线程[RxComputationScheduler-3,5,main]

o1 o2线程[RxIoScheduler-3,5,main]

o1 o2线程[RxIoScheduler-2,5,main]

o1 o2线程[RxComputationScheduler-1,5,main]

o1 o2线程[RxComputationScheduler-3,5,main]

o1 o2线程[RxIoScheduler-3,5,main]

o1 o2线程[RxIoScheduler-2,5,main]

为什么所有o1,o2,z可观察对象都未订阅Schedulers.io()?我认为应该将此调度程序传播到整个可观察对象链上,但这只是有时发生。

yos*_*riz 5

有什么情况发生zip()操作是,zip函数的线程上调用(根据Scheduler)最后发射的Observable,那就是,在这种情况下,随机o1o2调度程序是computation()io()和。

为什么每个is computation()和the的调度程序io()
尽管您已在zip中指定了Scheduler Observable,但它只会影响的订阅操作zip(),而不必影响每个压缩Observable操作的位置。

在RxJava每个Observable可以指定自己的Scheduler,而当zip()将认购压缩的Observable,每次都会对配置进行操作Scheduler
o1情况下- computation因为它是有明确配置subscribeOn()
但是,如果未Scheduler指定Observable-则Observable将对Scheduler已完成的订阅进行操作-在您的情况下,情况就是这样o2,因为在zip订阅被调用的地方调用了它的订阅io()

如果您出于某种原因确实在意此zip()功能的实现,则可以简单地进行2个压缩的对象,并通过observerOn()更改操作和执行自定义zip逻辑的位置map()