是否可以并行启动Mono并汇总结果

pos*_*ver 10 project-reactor spring-webflux

我知道可以链接Mono的,例如......

Mono<String> resultAMono = loadA();
Mono<String> resultBMono = resultA.flatMap(resultA -> loadB());
Run Code Online (Sandbox Code Playgroud)

这将链和resultBMono将在resultAMono返回时运行....

所以我的问题是,是否有可能并行启动2个单声道并且当两个单声道继续使用另一个单声道时?

我认为它看起来像这样......

Mono<String> resultAMono = loadA();
Mono<String> resuktBMono = loadB();
Mono<Tuple2<Stirng, String> tupleMono = Mono.zip(resultAMono, resultBMono);
Run Code Online (Sandbox Code Playgroud)

但是我不知道这将并行运行或者我可以做什么来并行运行...

谢谢答案....

Sim*_*slé 12

2种语义,一种使它们并行运行的方式

我在下面介绍的两个选项都需要进行一些其他调整才能使A和B Mono并行运行:即,每个选项Mono都应用于subscribeOn(Scheduler)退出合并的公共线程。

如果您只关心A和B的完成

使用when监听A和B完成,并then继续与一个完全不同的Mono

Mono.when(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .then(Mono.just("A and B finished, I don't know their value"));
Run Code Online (Sandbox Code Playgroud)

如果您关心A和B值

使用zip+ map/ flatMap取决于要对结果执行的操作。

Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .map(tuple2 -> new Foo(tuple2.getT1(), tuple2.getT2(), "bar");
Run Code Online (Sandbox Code Playgroud)

要么

Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .flatMap(tuple2 -> fetchMoreDataAsMono(tuple2.getT1(), tuple2.getT2()));
Run Code Online (Sandbox Code Playgroud)

then将忽略之前的数据,因此在使用zip之前没有太大意义。

同样,如果A或B zip中的一个为 ,则会导致Mono为空! 使用switchIfEmpty/ defaultIfEmpty防止出现这种情况。

  • 对于并行执行 `myMono.subscribeOn(Schedulers.boundedElastic());` (3认同)
  • ++用于`subscribeOn(Scheduler)`提示! (2认同)