我很难理解以下代码示例的行为;
Flowable<String> f = Flowable.just(1)
.flatMap(it -> Flowable.create(e -> {
for(int i = 1; i < 1001; ++i) {
log.info("Emitting: " + i);
if(i % 10 == 0) {
Thread.sleep(1000);
}
e.onNext(i);
}
e.onComplete();
}, BackpressureStrategy.BUFFER))
.map(String::valueOf)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread());
f.subscribe(val -> {
Thread.sleep(100);
log.info("Observing: " + val);
});
Thread.sleep(1000000);
Run Code Online (Sandbox Code Playgroud)
代码运行正常,直到subscribe调用观察到 128 个项目。发射和观察是并行的。但在那之后,Flowable 继续发出项目(显然在某处排队)但直到所有 1000 个项目都发出后才观察到任何项目。在发出所有 1000 个项目后,立即观察其余项目(> 128 个)。
这看起来与 128 的背压 bufferSize 相关,但我仍然希望发射和观察对于整个 1000 个项目是并行的,因为观察者显然不比发射器慢。有什么我在这里想念的吗?我应该怎么做才能修复代码?
这是由于 create 和subscribeOn之间存在相同的池死锁:
如果
create(FlowableOnSubscribe, BackpressureStrategy)链中存在类型源,建议改为使用subscribeOn(scheduler, false)以避免同池死锁,因为请求可能会堆积在急切/阻塞发射器后面。
//...
.subscribeOn(Schedulers.io(), false)
//...
Run Code Online (Sandbox Code Playgroud)
编辑:
我通过用 Flowable.range 替换 Flowable.create 来尝试原始示例(加上您建议的修复),但我没有遇到问题。你能举例说明什么时候可能出现问题吗?
Flowable.range(1, 10)
.subscribeOn(Schedulers.io(), false)
.doOnNext(v -> System.out.println(Thread.currentThread().getName()))
.observeOn(Schedulers.single(), false, 1)
.blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)
这首先打印RxCachedThreadScheduler-1则RxSingleScheduler-19次因observeOn的补充要求将在单一的排程,而不是路由回IO调度运行。用 subscribeOn true 试试这个。
| 归档时间: |
|
| 查看次数: |
798 次 |
| 最近记录: |