具有不同调度程序的 flatMap 的行为

Buc*_*bue 4 rx-java2

你能解释一下为什么我在 flatMap 中更改返回的 Observable 的调度程序时会得到奇怪的输出吗?例如,我有

Observable.range(1, 9)
        .flatMap {
            if (it < 5) {
                Observable.just(it)
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.io())
            } else {
                Observable.just(it)
            }
        }
        .subscribe({ println("${it}: ${Thread.currentThread().name}") })
println("END")
Thread.sleep(200)
Run Code Online (Sandbox Code Playgroud)

作为输出,我每次运行都有不同的结果。例如第一次发射

1: RxCachedThreadScheduler-3
2: RxCachedThreadScheduler-3
3: RxCachedThreadScheduler-3
5: main
6: main
7: main
END
4: RxCachedThreadScheduler-6
8: RxCachedThreadScheduler-6
9: RxCachedThreadScheduler-6
Run Code Online (Sandbox Code Playgroud)

第二次发射输出:

5: main
1: main
2: main
3: main
6: main
7: main
8: main
9: main
END
4: RxCachedThreadScheduler-8
Run Code Online (Sandbox Code Playgroud)

aka*_*okd 7

flatMap非确定性地在其中一个参与线程上合并,因此即使内部源已经subscribeOn和/或observeOn定义,也不能保证哪个线程将在特定时刻赢得从源发出项目。因此,如果您想确保后续事件处理发生在所需的线程上(直到有另一个异步边界运算符),您必须应用observeOnafter flatMap