Paw*_*hra 26 java system.reactive rx-java
我使用RxJava Observable api获得以下代码:
Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
observable
.buffer(10000)
.observeOn(Schedulers.computation())
.subscribe(recordInfo -> {
_logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
for(Info info : recordInfo) {
// some I/O operation logic
}
},
exception -> {
},
() -> {
});
Run Code Online (Sandbox Code Playgroud)
我的期望是,在指定了计算调度程序之后,观察代码即subscribe()方法中的代码将并行执行.相反,代码仍然在单个线程上顺序执行.如何使用RxJava api使代码并行运行.
Lor*_*nMK 55
当涉及到异步/多线程方面时,RxJava经常被误解.多线程操作的编码很简单,但理解抽象是另一回事.
关于RxJava的一个常见问题是如何从Observable实现并行化或同时发出多个项目.当然,这个定义违反了Observable Contract,它声明onNext()必须按顺序调用,而不是一次由多个线程同时调用.
要实现并行性,您需要多个Observable.
这在一个线程中运行:
Observable<Integer> vals = Observable.range(1,10);
vals.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
Run Code Online (Sandbox Code Playgroud)
这在多个线程中运行:
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
Run Code Online (Sandbox Code Playgroud)
mic*_*brz 11
RxJava 2.0.5引入了并行流和ParallelFlowable,这使得并行执行更简单,更具说明性.
你不再需要创建Observable/ Flowable内flatMap,您也可以叫parallel()上Flowable并返回ParallelFlowable.
它并不像常规那样功能丰富Flowable,因为并发会引发Rx契约的许多问题,但是你有基本的map(),filter()还有更多,在大多数情况下这应该足够了.
所以代替来自@LordRaydenMK的这个流程回答
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
Run Code Online (Sandbox Code Playgroud)
现在你可以这样做:
Flowable<Integer> vals = Flowable.range(1, 10);
vals.parallel()
.runOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.sequential()
.subscribe(val -> System.out.println(val));
Run Code Online (Sandbox Code Playgroud)