你能解释一下为什么我在 flatMap 中更改返回的 Observable 的调度程序时会得到奇怪的输出吗?例如,我有
Observable.range(1, 9)
.flatMap {
if (it < 5) {
Observable.just(it)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
} else {
Observable.just(it)
}
}
.subscribe({ println("${it}: ${Thread.currentThread().name}") })
println("END")
Thread.sleep(200)
Run Code Online (Sandbox Code Playgroud)
作为输出,我每次运行都有不同的结果。例如第一次发射
1: RxCachedThreadScheduler-3
2: RxCachedThreadScheduler-3
3: RxCachedThreadScheduler-3
5: main
6: main
7: main
END
4: RxCachedThreadScheduler-6
8: RxCachedThreadScheduler-6
9: RxCachedThreadScheduler-6
Run Code Online (Sandbox Code Playgroud)
第二次发射输出:
5: main
1: main
2: main
3: main
6: main
7: main
8: main
9: main
END
4: RxCachedThreadScheduler-8
Run Code Online (Sandbox Code Playgroud)
flatMap非确定性地在其中一个参与线程上合并,因此即使内部源已经subscribeOn和/或observeOn定义,也不能保证哪个线程将在特定时刻赢得从源发出项目。因此,如果您想确保后续事件处理发生在所需的线程上(直到有另一个异步边界运算符),您必须应用observeOnafter flatMap。
| 归档时间: |
|
| 查看次数: |
926 次 |
| 最近记录: |