K.N*_*las 5 java reactor reactive-programming project-reactor spring-webflux
我创建了一个ParallelFlux然后使用了.sequential(),期望在那时我可以计算或“减少”并行计算的结果。问题似乎是并行线程被触发,没有什么在等待它们。
我已经使用 a 完成了一些工作,CountDownLatch但我认为我不应该这样做。
TL; DR - 我无法打印出此代码的结果:
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
在 main 中执行该代码时,这很好地展示了事物的异步性质。该操作在来自弹性调度程序的线程上运行,订阅触发异步处理,因此它立即返回。
有两种方法可以将应用程序的结尾与 的结尾同步Flux:
打印doOnNext并使用blockLast()
block* 方法通常用于 main 和 tests,在那里别无选择,只能切换回阻塞模型(即因为否则 test/main 将在处理结束之前退出)。
我们切换在 a 中打印每个发出的项目的“副作用” doOnNext,它专用于那种用例。然后我们阻塞直到通量完成,用blockLast()。
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doOnNext(System.out::println)
.blockLast();
Run Code Online (Sandbox Code Playgroud)
打印subscribe并使用CountDownLatchindoFinally
这个有点复杂,但如果您想探索该方法的工作原理,则允许您实际使用订阅。
我们只需添加一个doFinally,它会在任何完成信号(取消、错误或完成)上触发。我们使用 aCountDownLatch来阻塞主线程,主线程将从doFinally.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doFinally(signal -> latch.countDown())
.subscribe(System.out::println);
latch.await(10, TimeUnit.SECONDS);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1615 次 |
| 最近记录: |