Spring Reactor Merge 与 Concat

pau*_*aul 4 java spring reactor flux

我在玩 Spring reactor,我看不出concatmergeoperator之间有什么区别

这是我的例子

    @Test
    public void merge() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.merge(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);
    }

    @Test
    public void concat() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.concat(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);    
}
Run Code Online (Sandbox Code Playgroud)

两者的行为完全相同。有人可以解释这两种操作之间的区别吗?

Amr*_*eya 13

merge 和 concat 的本质区别在于,在 merge 中,两个流都是实时的。在 concat 的情况下,第一个流被终止,然后另一个流连接到它。

康卡特 在此处输入图片说明


合并 在此处输入图片说明


Vik*_*wat 7

API 文档中已经提到了区别,虽然concat首先完全读取一个通量,然后将第二个通量附加到该通量,但合并运算符不保证两个通量之间的顺序。

为了查看差异,请修改您的 merge() 代码,如下所示。

例如下面的示例代码

//Flux with Delay
@Test
public void merge() {
    Flux<String> flux1 = Flux.just("Hello", "Vikram");

    flux1 = Flux.interval(Duration.ofMillis(3000))
    .zipWith(flux1, (i, msg) -> msg);
    
    
    Flux<String> flux2 = Flux.just("reactive");
    flux2 = Flux.interval(Duration.ofMillis(2000))
            .zipWith(flux2, (i, msg) -> msg);
    
    Flux<String> flux3 = Flux.just("world");
    Flux.merge(flux1, flux2, flux3)
            .subscribe(System.out::println);

    try {
        Thread.sleep(8000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
Run Code Online (Sandbox Code Playgroud)

当您修改 Flux.interval 持续时间(当前设置为 3000 毫秒)时,您将看到 merge() 的输出不断变化。但是使用 concat(),输出将始终相同。