RxJava-一个生产者,一个订阅中有许多并发消费者

Ále*_*lex 3 concurrency producer-consumer rx-java

我正在尝试了解RxJava并发性的一些细节,但不确定自己的想法是否正确。我对SubscribeOn / ObserveOn的工作方式有很好的了解,但是我正在尝试确定池调度程序的某些细节。为此,我正在考虑实现一个1-N生产者-消费者链,该链应尽可能多地拥有尽可能多的CPU。

根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据无功合同,操作员只能获得顺序呼叫。

因此,这样的设置

Observable.range(1, 1000) // Whatever has to be processed
            .observeOn(Schedulers.computation())
            .doOnNext(/* heavy computation */)
            .doOnCompleted(() -> System.out.println("COMPLETED"))
            .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

即使使用线程池,也只会收到并发调用doOnNext。进行睡眠检查的实验OperatorObserveOn.java似乎证实了这一点,因为每次observeOn呼叫都会获得一名工人。另外,如果不是这样,则必须对OnCompleted进行复杂的管理,必须等待任何未完成的OnNext完成,但我发现这并不存在。

假设我在正确的轨道上(也就是说,只涉及一个线程,尽管您可以使用observeOn在其中几个线程之间跳转),那么正确的模式是什么?我可以找到相反情况的示例(将多个异步事件生成器同步到一个使用者中),但是对于这种典型情况,不是简单的示例。

我猜想涉及了flatMap,也许使用了beta版本(在1.x中),该版本限制了并发订阅的数量。可能像使用window / flatMap这样简单吗?

Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example 
.flatMap(/* Processing */, 4) // For 4-concurrent processing
.subscribe()
Run Code Online (Sandbox Code Playgroud)

在这种方法中,我仍然缺少以Rx通用方式最大化CPU的简单方法(即,指定计算调度程序而不是对flatMap进行最大预订)。所以,也许...:

Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example 
.flatMap(v -> Observable.just(v)
                        .observeOn(Schedulers.computation())
                        .map(/* heavy parallel computation */))
.subscribe()
Run Code Online (Sandbox Code Playgroud)

最后,在一些带有flatMap的示例中,我看到一个toBlock()调用,在flatMap此之后我不确定为什么需要调用,因为flatMap不应为下游执行序列化吗?(例如,在此示例中:http : //akarnokd.blogspot.com.es/2016/02/flatmap-part-1.html

Ale*_*yev 5

托马斯·尼尔德(Thomas Nield)有一篇很好的文章就是关于这种情况

RxJava-最大化并行化

我亲自做在这种情况下什么,我只是认购Schedulers.ioflatMap最多并发调用参数。

    Observable.range(1, 1000) // Whatever has to be processed
            .flatMap(v -> Observable.fromCallable(() -> { /* io calls */}) 
                    .subscribeOn(Schedulers.io()), Runtime.getRuntime().availableProcessors() + 1)
            .subscribe();
Run Code Online (Sandbox Code Playgroud)

根据注释中的建议进行编辑,最好将其Schedulers.computation()用于CPU绑定工作

    Observable.range(1, 1000) // Whatever has to be processed
            .flatMap(v -> Observable.fromCallable(() -> { /* intense calculation */}) 
                    .subscribeOn(Schedulers.computation()))
            .subscribe();
Run Code Online (Sandbox Code Playgroud)