pau*_*aul 4 java spring reactor flux
我在玩 Spring reactor,我看不出concat和mergeoperator之间有什么区别
这是我的例子
@Test
public void merge() {
Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux3 = Flux.just("world");
Flux.merge(flux1, flux2, flux3)
.map(String::toUpperCase)
.subscribe(System.out::println);
}
@Test
public void concat() {
Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux3 = Flux.just("world");
Flux.concat(flux1, flux2, flux3)
.map(String::toUpperCase)
.subscribe(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
两者的行为完全相同。有人可以解释这两种操作之间的区别吗?
API 文档中已经提到了区别,虽然concat首先完全读取一个通量,然后将第二个通量附加到该通量,但合并运算符不保证两个通量之间的顺序。
为了查看差异,请修改您的 merge() 代码,如下所示。
例如下面的示例代码
//Flux with Delay
@Test
public void merge() {
Flux<String> flux1 = Flux.just("Hello", "Vikram");
flux1 = Flux.interval(Duration.ofMillis(3000))
.zipWith(flux1, (i, msg) -> msg);
Flux<String> flux2 = Flux.just("reactive");
flux2 = Flux.interval(Duration.ofMillis(2000))
.zipWith(flux2, (i, msg) -> msg);
Flux<String> flux3 = Flux.just("world");
Flux.merge(flux1, flux2, flux3)
.subscribe(System.out::println);
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
当您修改 Flux.interval 持续时间(当前设置为 3000 毫秒)时,您将看到 merge() 的输出不断变化。但是使用 concat(),输出将始终相同。
| 归档时间: |
|
| 查看次数: |
6577 次 |
| 最近记录: |