vac*_*ach 5 java reactive-programming java-8 rx-java
我正在学习使用 RxJava 进行反应式编程,并且希望同时使用发出的值,而不会在单个执行线程中阻塞。
Observable
.interval(50, TimeUnit.MILLISECONDS)
.take(5)
.subscribe(new Action1<Long>() {
@Override
public void call(Long counter) {
sleep(1000);
System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
}
});
sleep(10000);
Run Code Online (Sandbox Code Playgroud)
我会得到这个输出
Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1
Run Code Online (Sandbox Code Playgroud)
我如何异步处理每个事件?像这样
Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5
Run Code Online (Sandbox Code Playgroud)
在 Rx 中,一个 observable 代表并发性1,因此要同时处理彼此之间的通知,您必须将每个通知投影到一个 observable 中。
flatMap是异步顺序组合运算符。它将来自源可观察对象的每个通知投影到可观察对象中,从而允许您同时处理每个输入值。然后,它将每次计算的结果合并为具有不重叠通知的扁平可观察序列。
附录:
在selectorfor中flatMap,根据目标平台,通常有多种方法来创建并发可观察量。我不懂 Java,但在 .NET 中,您通常会使用Observable.Start引入并发性或异步方法 ( async/await) 来利用本机异步,这通常是更可取的。
1从技术上讲,冷可观察量的单独订阅(观察者)可以实现 Rx 中的并发性,尽管从可观察量的角度思考通常更方便。请参阅此答案以获取更多信息。
| 归档时间: |
|
| 查看次数: |
505 次 |
| 最近记录: |