消耗观察者同时发出的值

vac*_*ach 5 java reactive-programming java-8 rx-java

我正在学习使用 RxJava 进行反应式编程,并且希望同时使用发出的值,而不会在单个执行线程中阻塞。

        Observable
            .interval(50, TimeUnit.MILLISECONDS)
            .take(5)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long counter) {
                    sleep(1000);
                    System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
                }
            });

    sleep(10000);
Run Code Online (Sandbox Code Playgroud)

我会得到这个输出

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1
Run Code Online (Sandbox Code Playgroud)

我如何异步处理每个事件?像这样

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5
Run Code Online (Sandbox Code Playgroud)

Dav*_*ton 4

在 Rx 中,一个 observable 代表并发性1,因此要同时处理彼此之间的通知,您必须将每个通知投影到一个 observable 中。

flatMap是异步顺序组合运算符。它将来自源可观察对象的每个通知投影到可观察对象中,从而允许您同时处理每个输入值。然后,它将每次计算的结果合并为具有不重叠通知的扁平可观察序列。

附录:

selectorfor中flatMap,根据目标平台,通常有多种方法来创建并发可观察量。我不懂 Java,但在 .NET 中,您通常会使用Observable.Start引入并发性或异步方法 ( async/await) 来利用本机异步,这通常是更可取的。

1从技术上讲,冷可观察量的单独订阅(观察者)可以实现 Rx 中的并发性,尽管从可观察量的角度思考通常更方便。请参阅此答案以获取更多信息。