gin*_*dex 1 project-reactor spring-webflux
cache()Flux和replay()ifpublish()创建热门发布者有什么区别?对于哪种用例,哪种运算符最适合?
以下示例使用 3 种不同的方法重播所有 5 个元素。
cache():
var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
.delayElements(Duration.ofSeconds(1)).cache();
flux.doOnNext(v -> System.out.println("First: " + v))
.subscribe();
Thread.sleep(5000);
flux.doOnNext(v -> System.out.println("Second: " + v))
.subscribe();
Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)
replay():
var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
.delayElements(Duration.ofSeconds(1)).replay();
flux.doOnNext(v -> System.out.println("First: " + v))
.subscribe();
Thread.sleep(5000);
flux.doOnNext(v -> System.out.println("Second: " + v))
.subscribe();
flux.connect();
Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)
publish():
var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
.delayElements(Duration.ofSeconds(1)).publish();
flux.doOnNext(v -> System.out.println("First: " + v))
.subscribe();
Thread.sleep(5000);
flux.doOnNext(v -> System.out.println("Second: " + v))
.subscribe();
flux.connect();
Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)
打印结果的一种变化:
First: 1
First: 2
First: 3
First: 4
Second: 1
Second: 2
Second: 3
Second: 4
First: 5
Second: 5
Run Code Online (Sandbox Code Playgroud)
cache()是 的方便别名.replay().autoConnect(1),即 它将执行connect()一旦第一个订阅者进来,
但由于它重播了整个历史记录,第二个订阅者仍然可以看到所有元素。
从你的例子replay()来看publish(),你可能认为两者没有区别。但那是因为你connect()在两个订阅者都订阅之后......
如果您要将connect()呼叫移至第二个订阅者之前,您会发现在这种情况下publish()看不到任何价值。replay()另一方面,将向第二个订阅者重播整个历史记录,尽管它迟到了。
| 归档时间: |
|
| 查看次数: |
1799 次 |
| 最近记录: |