Project Reactor 中的 flatMap、flatMapSequential 和 concatMap 有什么区别?

rop*_*pes 9 reactor project-reactor reactive-streams spring-webflux

我从文档中读到flatMap

将此 Flux 发出的元素异步转换为 Publisher,然后通过合并将这些内部发布者扁平化为单个 Flux,从而允许它们交错。

flatMapSequential

将此 Flux 发出的元素异步转换为 Publishers,然后将这些内部发布者扁平化为单个 Flux,但按照源元素的顺序合并它们。

然后concatMap

将此 Flux 发出的元素异步转换为发布者,然后将这些内部发布者扁平化为单个 Flux,按顺序并使用串联保留顺序。该运算符有三个维度可以与 flatMap 和 flatMapSequential 进行比较:

内部函数的生成和订阅:该运算符等待一个内部函数完成,然后再生成下一个内部函数并订阅它。

展平值的排序:该运算符自然地保留与源元素相同的顺序,按顺序连接每个源元素的内部。

交错:此运算符不会让来自不同内部的值交错(串联)。

和 其他两者之间的区别flatMap是可以理解的,但我不明白concatMap和之间的区别何时flatMapSequential发生。两者之间有性能差异吗?我读过它flatMapSequential有一些队列的缓冲区大小,但我不明白为什么concatMap不需要它。

lka*_*ris 33

\xc2\xa0flatMapflatMapSequential\xc2\xa0 运算符急切地订阅,concatMap\xc2\xa0 在生成下一个子流并订阅它之前等待每个内部完成。

\n

让我们看一个例子:

\n
  @Test\n  void test_flatMap() {\n    Flux.just(1, 2, 3)\n        .flatMap(this::doSomethingAsync)\n        //.flatMapSequential(this::doSomethingAsync)\n        //.concatMap(this::doSomethingAsync)\n        .doOnNext(n -> log.info("Done {}", n))\n        .blockLast();\n  }\n\n  private Mono<Integer> doSomethingAsync(Integer number) {\n    //add some delay for the second item...\n    return number == 2 ? Mono.just(number).doOnNext(n -> log.info("Executing {}", n)).delayElement(Duration.ofSeconds(1))\n        : Mono.just(number).doOnNext(n -> log.info("Executing {}", n));\n  }\n
Run Code Online (Sandbox Code Playgroud)\n

输出:

\n
2022-04-22 19:38:49,164  INFO main - Executing 1\n2022-04-22 19:38:49,168  INFO main - Done 1\n2022-04-22 19:38:49,198  INFO main - Executing 2\n2022-04-22 19:38:49,200  INFO main - Executing 3\n2022-04-22 19:38:49,200  INFO main - Done 3\n2022-04-22 19:38:50,200  INFO parallel-1 - Done 2\n
Run Code Online (Sandbox Code Playgroud)\n

正如您所看到的,flatMap不保留原始顺序,并且急切地订阅了所有三个元素。另外,请注意元素 3 已在元素 2 之前进行。

\n

这是使用的输出flatMapSequential

\n
2022-04-22 19:53:40,229  INFO main - Executing 1\n2022-04-22 19:53:40,232  INFO main - Done 1\n2022-04-22 19:53:40,261  INFO main - Executing 2\n2022-04-22 19:53:40,263  INFO main - Executing 3\n2022-04-22 19:53:41,263  INFO parallel-1 - Done 2\n2022-04-22 19:53:41,264  INFO parallel-1 - Done 3\n
Run Code Online (Sandbox Code Playgroud)\n

flatMapSequential已急切地订阅了所有三个元素flatMap,但通过\xc2\xa0对无序接收的元素进行排队来保留顺序。

\n

这是使用的输出concatMap

\n
2022-04-22 19:59:31,817  INFO main - Executing 1\n2022-04-22 19:59:31,820  INFO main - Done 1\n2022-04-22 19:59:31,853  INFO main - Executing 2\n2022-04-22 19:59:32,857  INFO parallel-1 - Done 2\n2022-04-22 19:59:32,857  INFO parallel-1 - Executing 3\n2022-04-22 19:59:32,857  INFO parallel-1 - Done 3\n
Run Code Online (Sandbox Code Playgroud)\n

concatMap\xc2\xa0 自然地保留与源元素相同的顺序。

\n