RxJava和观察者代码的并行执行

Paw*_*hra 26 java system.reactive rx-java

我使用RxJava Observable api获得以下代码:

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
    observable
      .buffer(10000)
      .observeOn(Schedulers.computation())
      .subscribe(recordInfo -> {
        _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
          for(Info info : recordInfo) {
            // some I/O operation logic
         }
      }, 
      exception -> {
      }, 
      () -> {
      });
Run Code Online (Sandbox Code Playgroud)

我的期望是,在指定了计算调度程序之后,观察代码即subscribe()方法中的代码将并行执行.相反,代码仍然在单个线程上顺序执行.如何使用RxJava api使代码并行运行.

Lor*_*nMK 55

当涉及到异步/多线程方面时,RxJava经常被误解.多线程操作的编码很简单,但理解抽象是另一回事.

关于RxJava的一个常见问题是如何从Observable实现并行化或同时发出多个项目.当然,这个定义违反了Observable Contract,它声明onNext()必须按顺序调用,而不是一次由多个线程同时调用.

要实现并行性,您需要多个Observable.

这在一个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.subscribeOn(Schedulers.computation())
          .map(i -> intenseCalculation(i))
          .subscribe(val -> System.out.println("Subscriber received "
                  + val + " on "
                  + Thread.currentThread().getName()));
Run Code Online (Sandbox Code Playgroud)

这在多个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
Run Code Online (Sandbox Code Playgroud)

代码和文字来自这篇博.


mic*_*brz 11

RxJava 2.0.5引入了并行流ParallelFlowable,这使得并行执行更简单,更具说明性.

你不再需要创建Observable/ FlowableflatMap,您也可以叫parallel()Flowable并返回ParallelFlowable.

它并不像常规那样功能丰富Flowable,因为并发会引发Rx契约的许多问题,但是你有基本的map(),filter()还有更多,在大多数情况下这应该足够了.

所以代替来自@LordRaydenMK的这个流程回答

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
        .subscribeOn(Schedulers.computation())
        .map(i -> intenseCalculation(i))
    ).subscribe(val -> System.out.println(val));
Run Code Online (Sandbox Code Playgroud)

现在你可以这样做:

Flowable<Integer> vals = Flowable.range(1, 10);

vals.parallel()
        .runOn(Schedulers.computation())
        .map(i -> intenseCalculation(i))
        .sequential()
        .subscribe(val -> System.out.println(val));
Run Code Online (Sandbox Code Playgroud)

  • AFAIK,flowable 用于具有背压的流。如果我们不需要背压,那么我们应该使用 Observable。是否有像 flowable 一样的 Observable 的并行实现? (2认同)