Project Reactor并行执行

Mik*_*dan 4 java reactive-programming project-reactor

Project Reactor 3.1.5.RELEASE

考虑一下:

Flux.range(0, 10)
    .publishOn(Schedulers.parallel())
    .subscribe(i -> LOG.info(i));
Run Code Online (Sandbox Code Playgroud)

我期望订阅者在多个线程中运行,但它只运行在一个:

2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 0
2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 1
2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 2
2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 3
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 4
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 5
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 6
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 7
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 8
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 9
Run Code Online (Sandbox Code Playgroud)

文档告诉我的期望是正确的(http://projectreactor.io/docs/core/release/reference/#threading).有人可以向我解释那里发生了什么吗?

aka*_*okd 14

反应流本质上是顺序的,publishOn只是告诉源在哪里一个接一个地发出每个值.您需要告诉流程并行通过parallel,然后通过runOn以下方式指定调度程序:

Flux.range(0, 10)
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i -> LOG.info(i))
.sequential()
.subscribe();
Run Code Online (Sandbox Code Playgroud)

  • 关于在并行和平面图之间进行选择,请参阅此答案:/sf/answers/3029179401/ (2认同)