SoT*_*SoT 1 java reactive-programming project-reactor reactive-streams spring-webflux
如果我们提供相同的数据源,所有这些方法都会产生相同的结果。那么它们之间有什么区别呢?
Mic*_*rry 12
采取以下(人为的)concat()
示例,其中两个发布者以 100 毫秒的间隔发出 3 个元素:
Flux<Integer> a = Flux.range(0, 3).delayElements(Duration.ofMillis(100));
Flux<Integer> b = Flux.range(0, 3).delayElements(Duration.ofMillis(100));
Flux.concat(a, b)
.timed()
.doOnNext(x -> System.out.println(x.get() + ": " + x.elapsed().toMillis()))
.blockLast();
Run Code Online (Sandbox Code Playgroud)
在这里您将看到类似于以下内容的输出:
0: 138
1: 107
2: 108
0: 111
1: 102
2: 107
Run Code Online (Sandbox Code Playgroud)
所以我们以 100 毫秒的间隔发射了 6 个元素。第一个发布者被订阅,以 100 毫秒的间隔发出 3 个元素,然后完成。然后订阅第二个发布者,以 100 毫秒的间隔发出 3 个元素,然后完成。
如果我们替换为concat()
,mergeSequential()
您将看到类似以下内容:
0: 118
1: 107
2: 107
0: 0
1: 0
2: 0
Run Code Online (Sandbox Code Playgroud)
元素以相同的顺序发出 - 但看看最后 3 个的时间!这是因为行为略有不同 - 在这种情况下,两个发布者都订阅了,因此开始以 100 毫秒的间隔发出元素。来自第一个发布者的元素在收到时发出,但来自第二个发布者的元素将被缓存,直到第一个发布者完成。
当第一个发布者完成时,第二个发布者就会接管 - 我们缓存的所有元素都会立即发出,因此没有延迟(因此计时为零。)我们发出了相同的元素,但速度更快。
这似乎是有利的,但您可能不想直接跳到依赖mergeSequential()
而不是依赖有两个主要原因concat()
:
由于上述两个原因,根据我的经验,您通常只想使用concat()
而不是mergeSequential()
在现实世界中使用。
至于,在上面的示例中使用它,您将看到元素mergeOrdered()
的实际顺序有所不同:
0: 127
0: 105
1: 17
1: 90
2: 15
2: 0
Run Code Online (Sandbox Code Playgroud)
这里的急切订阅部分mergeSequential()
是相同的,但有另一个变化 - 从每个发布者发出的值在发出时进行比较,并且首先发出最小的值。因此,您将看到(在本例中)一个有序的数字流:0,0,1,1,2,2
。请注意,时间安排似乎有所不同,mergeSequential()
因为它在最终输出中交错来自两个发布商的值,而不仅仅是按顺序合并它们。
归档时间: |
|
查看次数: |
1699 次 |
最近记录: |