为什么 Project Reactor 的 Mono 没有 share() 运算符?

Tia*_*oto 3 project-reactor

我想像分享 Flux 一样“分享”Mono。

Kotlin 的 Flux share() 示例:

fun `test flux share`() {
        val countDownLatch = CountDownLatch(2)

        val originalFlux = Flux.interval(Duration.ofMillis(200))
                .map { "$it = ${Instant.now()}" }
                .take(7)
                .share()
                .doOnTerminate {
                    countDownLatch.countDown()
                }


        println("Starting #1...")

        originalFlux.subscribe {
            println("#1: $it")
        }

        println("Waiting ##2...")
        CountDownLatch(1).await(1000, TimeUnit.MILLISECONDS)
        println("Starting ##2...")

        originalFlux.subscribe {
            println("##2: $it")
        }

        countDownLatch.await(10, TimeUnit.SECONDS)

        println("End!")
    }
Run Code Online (Sandbox Code Playgroud)

我找不到 Mono 的 share() 运算符。为什么它不存在?

Mic*_*rry 5

\n

我找不到 Mono 的 share() 运算符。为什么它不存在?

\n
\n\n

的具体行为share()对于 Mono 来说没有多大意义,但我们拥有cache()的可能正是您所追求的。

\n\n

share()相当于你调用publish().refcount()你的 Flux。具体来说,publish()给你一个ConnectableFlux,或“热”通量。(refcount()只是根据第一个/最后一个订阅者自动连接/停止通量。)

\n\n

“raison d\'\xc3\xaatre”ConnectableFlux允许多个订阅者随时订阅,而忽略他们订阅之前发出的数据。如果是Mono没有多大意义,因为根据定义,只有一个值被发出 - 所以如果你错过了它,那么你就错过了它。

\n\n

然而,我们在 Mono 上确实有cache(),这也将它变成了“热”源(其中原始供应商不会为每个订阅调用,只在第一次订阅时调用一次。)与上面的明显区别是值会为每个订阅者重播,但这几乎肯定是您想要的。

\n\n

(旁注,如果您测试上述内容 - 请注意,您需要使用Mono.fromSupplier()而不是Mono.just(),因为后者只会在实例化时获取一次值,因此cache()没有任何有意义的效果。)

\n