为什么RxJava zip运算符在最后发出值的线程上工作?

Mar*_*och 1 multithreading android rx-java

我正在尝试压缩两个在不同线程上发出的Observable:

Observable<String> xxxx1 = Observable.fromCallable((Func0<String>) () -> {
    try {
        Thread.sleep((long)(Math.random() * 1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "First";
})
        .doOnNext(s -> Log.d("TEEEST", "1 onNext " + s + " thread " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.computation());

Observable<String> xxxx2 = Observable.fromCallable((Func0<String>) () -> {
    try {
        Thread.sleep((long)(Math.random() * 1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Second";
})
        .doOnNext(s -> Log.d("TEEEST", "2 onNext " + s + " thread " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.io());

Observable.zip(xxxx1, xxxx2, (s1, s2) -> {
    Log.d("TEEEST", "zip func thread " + Thread.currentThread().getName());
    return s1.concat(s2);
})
        .map(s -> {
            Log.d("TEEEST", "map thread " + Thread.currentThread().getName());
            return s.concat(" mapped");
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
            Log.d("TEEEST", "call " + s + " thread " + Thread.currentThread().getName());
        });
Run Code Online (Sandbox Code Playgroud)

似乎zip可以在发出值作为最后一个值的线程上工作,这是我的日志的样子:

首轮:

D/TEEEST: 2 onNext Second thread RxIoScheduler-3
D/TEEEST: 1 onNext First thread RxComputationScheduler-1
D/TEEEST: zip func thread RxComputationScheduler-1
D/TEEEST: map thread RxComputationScheduler-1
D/TEEEST: call FirstSecond mapped thread main
Run Code Online (Sandbox Code Playgroud)

第二轮:

D/TEEEST: 1 onNext First thread RxComputationScheduler-2
D/TEEEST: 2 onNext Second thread RxIoScheduler-2
D/TEEEST: zip func thread RxIoScheduler-2
D/TEEEST: map thread RxIoScheduler-2
D/TEEEST: call FirstSecond mapped thread main
Run Code Online (Sandbox Code Playgroud)
  1. 此行为记录在某处吗?
  2. 为什么会这样。
  3. 如何确保zip功能和所有下游内容(map在我的情况下为运算符)适用于特定的Scheduler,而不是随机的。

tyn*_*ynn 6

zip默认情况下不会在特定的上运行Scheduler

zip仅在具有所有要压缩的值时发出。因此它在收到最后一个值的同一线程上发出。

为了确保所有下游操作都在特定的调度程序上进行,您必须定义一个observeOn()副作用。

对于所有下游操作,观察压缩结果就足够了。

Observable.zip(...).observeOn(scheduler)
Run Code Online (Sandbox Code Playgroud)

对于上游,您必须观察在此特定调度程序上压缩的观察者。

Observable.zip(o1.observeOn(scheduler), o2.observeOn(scheduler), ...)
Run Code Online (Sandbox Code Playgroud)

但是,取决于调度程序,您不能使用该线程。