Ále*_*lex 3 concurrency producer-consumer rx-java
我正在尝试了解RxJava并发性的一些细节,但不确定自己的想法是否正确。我对SubscribeOn / ObserveOn的工作方式有很好的了解,但是我正在尝试确定池调度程序的某些细节。为此,我正在考虑实现一个1-N生产者-消费者链,该链应尽可能多地拥有尽可能多的CPU。
根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据无功合同,操作员只能获得顺序呼叫。
因此,这样的设置
Observable.range(1, 1000) // Whatever has to be processed
.observeOn(Schedulers.computation())
.doOnNext(/* heavy computation */)
.doOnCompleted(() -> System.out.println("COMPLETED"))
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
即使使用线程池,也只会收到并发调用doOnNext。进行睡眠检查的实验OperatorObserveOn.java似乎证实了这一点,因为每次observeOn呼叫都会获得一名工人。另外,如果不是这样,则必须对OnCompleted进行复杂的管理,必须等待任何未完成的OnNext完成,但我发现这并不存在。
假设我在正确的轨道上(也就是说,只涉及一个线程,尽管您可以使用observeOn在其中几个线程之间跳转),那么正确的模式是什么?我可以找到相反情况的示例(将多个异步事件生成器同步到一个使用者中),但是对于这种典型情况,不是简单的示例。
我猜想涉及了flatMap,也许使用了beta版本(在1.x中),该版本限制了并发订阅的数量。可能像使用window / flatMap这样简单吗?
Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example
.flatMap(/* Processing */, 4) // For 4-concurrent processing
.subscribe()
Run Code Online (Sandbox Code Playgroud)
在这种方法中,我仍然缺少以Rx通用方式最大化CPU的简单方法(即,指定计算调度程序而不是对flatMap进行最大预订)。所以,也许...:
Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example
.flatMap(v -> Observable.just(v)
.observeOn(Schedulers.computation())
.map(/* heavy parallel computation */))
.subscribe()
Run Code Online (Sandbox Code Playgroud)
最后,在一些带有flatMap的示例中,我看到一个toBlock()调用,在flatMap此之后我不确定为什么需要调用,因为flatMap不应为下游执行序列化吗?(例如,在此示例中:http : //akarnokd.blogspot.com.es/2016/02/flatmap-part-1.html)
托马斯·尼尔德(Thomas Nield)有一篇很好的文章就是关于这种情况
我亲自做在这种情况下什么,我只是认购Schedulers.io的flatMap最多并发调用参数。
Observable.range(1, 1000) // Whatever has to be processed
.flatMap(v -> Observable.fromCallable(() -> { /* io calls */})
.subscribeOn(Schedulers.io()), Runtime.getRuntime().availableProcessors() + 1)
.subscribe();
Run Code Online (Sandbox Code Playgroud)
根据注释中的建议进行编辑,最好将其Schedulers.computation()用于CPU绑定工作
Observable.range(1, 1000) // Whatever has to be processed
.flatMap(v -> Observable.fromCallable(() -> { /* intense calculation */})
.subscribeOn(Schedulers.computation()))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
832 次 |
| 最近记录: |