Project Reactor Flux conCat、flux mergeSequential、flux mergeOrdered 之间有什么区别

SoT*_*SoT 1 java reactive-programming project-reactor reactive-streams spring-webflux

如果我们提供相同的数据源,所有这些方法都会产生相同的结果。那么它们之间有什么区别呢?

Mic*_*rry 12

采取以下(人为的)concat()示例,其中两个发布者以 100 毫秒的间隔发出 3 个元素:

Flux<Integer> a = Flux.range(0, 3).delayElements(Duration.ofMillis(100));
Flux<Integer> b = Flux.range(0, 3).delayElements(Duration.ofMillis(100));

Flux.concat(a, b)
        .timed()
        .doOnNext(x -> System.out.println(x.get() + ": " + x.elapsed().toMillis()))
        .blockLast();
Run Code Online (Sandbox Code Playgroud)

在这里您将看到类似于以下内容的输出:

0: 138
1: 107
2: 108
0: 111
1: 102
2: 107
Run Code Online (Sandbox Code Playgroud)

所以我们以 100 毫秒的间隔发射了 6 个元素。第一个发布者被订阅,以 100 毫秒的间隔发出 3 个元素,然后完成。然后订阅第二个发布者,以 100 毫秒的间隔发出 3 个元素,然后完成。

如果我们替换为concat()mergeSequential()您将看到类似以下内容:

0: 118
1: 107
2: 107
0: 0
1: 0
2: 0
Run Code Online (Sandbox Code Playgroud)

元素以相同的顺序发出 - 但看看最后 3 个的时间!这是因为行为略有不同 - 在这种情况下,两个发布者都订阅了,因此开始以 100 毫秒的间隔发出元素。来自第一个发布者的元素在收到时发出,但来自第二个发布者的元素将被缓存,直到第一个发布者完成。

当第一个发布者完成时,第二个发布者就会接管 - 我们缓存的所有元素都会立即发出,因此没有延迟(因此计时为零。)我们发出了相同的元素,但速度更快。

这似乎是有利的,但您可能不想直接跳到依赖mergeSequential()而不是依赖有两个主要原因concat()

  • 在幕后缓存所有元素需要内存。在此示例中,有 3 个元素,几乎没有任何内存占用,但如果您开始处理数百万个元素(甚至是可能永远无法完成的发布者),您可能很快就会耗尽内存。
  • 立即订阅很可能会改变这种行为。以两个发布者为例,一个负责更改数据库中的值,另一个负责读取该值。如果将它们连接起来,写入将始终发生在读取之前。如果您同时订阅两者,则情况并非如此,您可能会在编写之前阅读它(但不一定。)

由于上述两个原因,根据我的经验,您通常只想使用concat()而不是mergeSequential()在现实世界中使用。

至于,在上面的示例中使用它,您将看到元素mergeOrdered()的实际顺序有所不同:

0: 127
0: 105
1: 17
1: 90
2: 15
2: 0
Run Code Online (Sandbox Code Playgroud)

这里的急切订阅部分mergeSequential()是相同的,但有另一个变化 - 从每个发布者发出的值在发出时进行比较,并且首先发出最小的值。因此,您将看到(在本例中)一个有序的数字流:0,0,1,1,2,2。请注意,时间安排似乎有所不同,mergeSequential()因为它在最终输出中交错来自两个发布商的值,而不仅仅是按顺序合并它们。