我想像分享 Flux 一样“分享”Mono。
Kotlin 的 Flux share() 示例:
fun `test flux share`() {
val countDownLatch = CountDownLatch(2)
val originalFlux = Flux.interval(Duration.ofMillis(200))
.map { "$it = ${Instant.now()}" }
.take(7)
.share()
.doOnTerminate {
countDownLatch.countDown()
}
println("Starting #1...")
originalFlux.subscribe {
println("#1: $it")
}
println("Waiting ##2...")
CountDownLatch(1).await(1000, TimeUnit.MILLISECONDS)
println("Starting ##2...")
originalFlux.subscribe {
println("##2: $it")
}
countDownLatch.await(10, TimeUnit.SECONDS)
println("End!")
}
Run Code Online (Sandbox Code Playgroud)
我找不到 Mono 的 share() 运算符。为什么它不存在?
\n\n\n我找不到 Mono 的 share() 运算符。为什么它不存在?
\n
的具体行为share()对于 Mono 来说没有多大意义,但我们拥有cache()的可能正是您所追求的。
share()相当于你调用publish().refcount()你的 Flux。具体来说,publish()给你一个ConnectableFlux,或“热”通量。(refcount()只是根据第一个/最后一个订阅者自动连接/停止通量。)
“raison d\'\xc3\xaatre”ConnectableFlux允许多个订阅者随时订阅,而忽略他们订阅之前发出的数据。如果是Mono没有多大意义,因为根据定义,只有一个值被发出 - 所以如果你错过了它,那么你就错过了它。
然而,我们在 Mono 上确实有cache(),这也将它变成了“热”源(其中原始供应商不会为每个订阅调用,只在第一次订阅时调用一次。)与上面的明显区别是值会为每个订阅者重播,但这几乎肯定是您想要的。
\n\n(旁注,如果您测试上述内容 - 请注意,您需要使用Mono.fromSupplier()而不是Mono.just(),因为后者只会在实例化时获取一次值,因此cache()没有任何有意义的效果。)
| 归档时间: |
|
| 查看次数: |
1846 次 |
| 最近记录: |