Mar*_*ert 4 multithreading android rx-java rx-android
我是RxJava的新手并且在(我猜)简单问题上苦苦挣扎.我想在3个线程中处理订阅部分simuleasly.这就是我使用FixedThreadPool的原因.示例代码:
Observer.just("one", "two", "three", "four")
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))
.subscribe(new Observer<String>() {
public void onNext(String string) {
Log.d(TAG, "Started: " + string);
SystemClock.sleep(1000);
Log.d(TAG, "Ended: " + string);
}
(...)
}
Run Code Online (Sandbox Code Playgroud)
预期结果:
Started: one
Started: two
Started: three
Ended: one
Started: four
Ended: two
Ended: three
Ended: four
Run Code Online (Sandbox Code Playgroud)
实际结果:
Started: one
Ended: one
Started: two
Ended: two
Started: three
Ended: three
Started: four
Ended: four
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?
RxJava Observable是顺序的,subscribeOn并且observeOn运算符不会相互并行运行值.
您可以实现的最接近的事情是通过模数键对值进行分组,运行它们observeOn并合并结果:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 100)
.groupBy(v -> count.getAndIncrement() % 3)
.flatMap(g -> g
.observeOn(Schedulers.computation())
.map(v -> Thread.currentThread() + ": " + v))
.toBlocking()
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1968 次 |
| 最近记录: |