如果创建热门发布者,flux cache()、replay() 和publish() 有什么区别?

gin*_*dex 1 project-reactor spring-webflux

cache()Flux和replay()ifpublish()创建热门发布者有什么区别?对于哪种用例,哪种运算符最适合?

以下示例使用 3 种不同的方法重播所有 5 个元素。

cache()

        var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
                .delayElements(Duration.ofSeconds(1)).cache();

        flux.doOnNext(v -> System.out.println("First: " + v))
        .subscribe();

        Thread.sleep(5000);

        flux.doOnNext(v -> System.out.println("Second: " + v))
                .subscribe();

        Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)

replay()

        var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
                .delayElements(Duration.ofSeconds(1)).replay();

        flux.doOnNext(v -> System.out.println("First: " + v))
        .subscribe();

        Thread.sleep(5000);

        flux.doOnNext(v -> System.out.println("Second: " + v))
                .subscribe();

        flux.connect();

        Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)

publish()

        var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
                .delayElements(Duration.ofSeconds(1)).publish();

        flux.doOnNext(v -> System.out.println("First: " + v))
        .subscribe();

        Thread.sleep(5000);

        flux.doOnNext(v -> System.out.println("Second: " + v))
                .subscribe();

        flux.connect();

        Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)

打印结果的一种变化:

First: 1
First: 2
First: 3
First: 4
Second: 1
Second: 2
Second: 3
Second: 4
First: 5
Second: 5
Run Code Online (Sandbox Code Playgroud)

Sim*_*slé 7

cache()是 的方便别名.replay().autoConnect(1),即 它将执行connect()一旦第一个订阅者进来,

但由于它重播了整个历史记录,第二个订阅者仍然可以看到所有元素。

从你的例子replay()来看publish(),你可能认为两者没有区别。但那是因为你connect()在两个订阅者都订阅之后......

如果您要将connect()呼叫移至第二个订阅者之前,您会发现在这种情况下publish()看不到任何价值。replay()另一方面,将向第二个订阅者重播整个历史记录,尽管它迟到了。