等待 ParallelFlux 完成

K.N*_*las 5 java reactor reactive-programming project-reactor spring-webflux

我创建了一个ParallelFlux然后使用了.sequential(),期望在那时我可以计算或“减少”并行计算的结果。问题似乎是并行线程被触发,没有什么在等待它们。

我已经使用 a 完成了一些工作,CountDownLatch但我认为我不应该这样做。

TL; DR - 我无法打印出此代码的结果:

    Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

Sim*_*slé 5

在 main 中执行该代码时,这很好地展示了事物的异步性质。该操作在来自弹性调度程序的线程上运行,订阅触发异步处理,因此它立即返回。

有两种方法可以将应用程序的结尾与 的结尾同步Flux

打印doOnNext并使用blockLast()

block* 方法通常用于 main 和 tests,在那里别无选择,只能切换回阻塞模型(即因为否则 test/main 将在处理结束之前退出)。

我们切换在 a 中打印每个发出的项目的“副作用” doOnNext,它专用于那种用例。然后我们阻塞直到通量完成,用blockLast()

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doOnNext(System.out::println)
    .blockLast();
Run Code Online (Sandbox Code Playgroud)

打印subscribe并使用CountDownLatchindoFinally

这个有点复杂,但如果您想探索该方法的工作原理,则允许您实际使用订阅。

我们只需添加一个doFinally,它会在任何完成信号(取消、错误或完成)上触发。我们使用 aCountDownLatch来阻塞主线程,主线程将从doFinally.

CountDownLatch latch = new CountDownLatch(1);

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doFinally(signal -> latch.countDown())
    .subscribe(System.out::println);

latch.await(10, TimeUnit.SECONDS);
Run Code Online (Sandbox Code Playgroud)