Kotlin SharedFlow - 如何订阅?

aSe*_*emy 4 kotlin kotlin-coroutines

我有一个生成消息的 JMS 队列。我想与多个 Kotlin 使用者共享这些消息,但前提是 Kotlin 使用者已连接。如果 Kotlin 消费者仅活跃 5 分钟,它应该只在该窗口内接收消息。Kotlin 消费者应该能够随时订阅,并随时获取收到的消息。

通过阅读文档,我认为 KotlinSharedFlow是做到这一点的最佳方法......

“SharedFlow 对于向来来往往的订阅者广播应用程序内部发生的事件非常有用。” (文档

但我找不到任何好的例子,而且文档非常混乱。文档SharedFlow说“所有收集器都会获取所有发出的值”和“共享流的活动收集器称为订阅者”,但它没有解释如何实际创建订阅者。

选项:

  • shareIn说它会将“冷流转换为热 SharedFlow”,但我没有冷流,我有热 SharedFlow。
  • Flow.collect在文档中链接,但它被标记为内部:“这是一个内部 kotlinx.coroutines API,不应从 kotlinx.coroutines 外部使用。”
  • launchIn被描述为终端 - 但我不想结束消费
amqMessageListener.messagesView.collect(object : FlowCollector<Message> { // internal API warning
  override suspend fun emit(value: Message) { ... }
})
Run Code Online (Sandbox Code Playgroud)
  • 两者Flow.collectlaunchIn“永远不会正常完成” - 但我确实希望能够正常完成它们。

这是我尝试订阅消息的方法,但我永远无法得到任何结果。

amqMessageListener.messagesView.collect(object : FlowCollector<Message> { // internal API warning
  override suspend fun emit(value: Message) { ... }
})
Run Code Online (Sandbox Code Playgroud)

更新

我有一个有效的解决方案。感谢 Tenfour04 的回答,这帮助我理解了。

这是一个接近我需要的示例。


import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch

suspend fun main() = coroutineScope {
  produceMessages()
  delay(1000)
}

suspend fun produceMessages() = coroutineScope {

  val messages = MutableSharedFlow<Int>(
    replay = 0,
    extraBufferCapacity = 0,
    onBufferOverflow = BufferOverflow.SUSPEND
  )

  // emit messages
  launch {
    repeat(100000) {
      println("emitting $it - result:${messages.tryEmit(it)}")
      delay(Duration.seconds(0.5))
    }
  }

  println("waiting 3")
  delay(Duration.seconds(3))

  launch {
    messages.onEach { println("onEach") }
  }
  launch {
    messages.onEach { println("onEach") }.launchIn(CoroutineScope(Dispatchers.Default))
  }
  launch {
    messages.collect { println("collect") }
  }

  launch {
    messages.launchIn(this)
    messages.collect { println("launchIn + collect") }
  }

  launch {
    val new = messages.shareIn(this, SharingStarted.Eagerly, replay = Int.MAX_VALUE)
    delay(Duration.seconds(2))
    println("new.replayCache: ${new.replayCache}")
  }

  launch {
    println("sharing")
    val l = mutableListOf<Int>()
    val x = messages.onEach { println("hello") }.launchIn(this)

    repeat(1000) {
      delay(Duration.seconds(1))

      println("result $it: ${messages.replayCache}")
      println("result $it: ${messages.subscriptionCount.value}")
      println("result $it: ${l}")
    }
  }
}

Run Code Online (Sandbox Code Playgroud)

Ten*_*r04 5

Flow.collect有一个标记为内部的重载,但有一个collect非常常用的公共扩展函数。我建议将这个包罗万象的导入放在文件的顶部,然后扩展函数将在其他与 Flow 相关的任务中可用:import kotlinx.coroutines.flow.*

launchIncollect是订阅流的两种最常见的方式。他们都是终点站。“终端”并不意味着它结束消费......它意味着它开始消费!“非终结符”函数是将一个 Flow 包装在另一个 Flow 中而不开始收集它的函数。

“永远不会正常完成”意味着协程中紧随其后的代码将无法到达。collect订阅流并挂起协程,直到流完成。由于 SharedFlow 永远不会完成,因此它“永远不会正常完成”。

很难对您的代码进行评论,因为在同一函数中启动流程并收集它是不寻常的。通常,SharedFlow 将作为属性公开以供其他函数使用。通过将它们全部组合到一个函数中,您隐藏了这样一个事实:通常 SharedFlow 可能是从与收集它的协程范围不同的协程范围发布的。

这是一个部分改编自您的代码的示例:

class Publisher {
    private val publishingScope = CoroutineScope(SupervisorJob())

    val messagesFlow: SharedFlow<Int> = MutableSharedFlow<Int>(
        replay = 0,
        extraBufferCapacity = 0,
        onBufferOverflow = BufferOverflow.SUSPEND
    ).also { flow ->
        // emit messages
        publishingScope.launch {
            repeat(100000) {
                println("emitting $it")
                flow.emit(it)
                delay(500)
            }
        }
    }
}

fun main() {
    val publisher = Publisher()
    runBlocking {
        val subscribingScope = CoroutineScope(SupervisorJob())

        // Delay a while. We'll miss the first couple messages.
        delay(1300)

        // Subscribe to the shared flow
        subscribingScope.launch {
            publisher.messagesFlow.collect { println("I am colllecting message $it") }
            // Any code below collection in this inner coroutine won't be reached because collect doesn't complete normally.
        }

        delay(3000) // Keep app alive for a while
    }
}
Run Code Online (Sandbox Code Playgroud)

由于collect通常会阻止其下面的任何代码在协程中运行,因此该launchIn函数可以使正在发生的事情更加明显,并且更加简洁:

fun main() {
    val publisher = Publisher()
    runBlocking {
        val subscribingScope = CoroutineScope(SupervisorJob())

        delay(1300)
        
        publisher.messagesFlow.onEach { println("I am colllecting message $it") }
            .launchIn(subscribingScope)

        delay(3000)
    }
}
Run Code Online (Sandbox Code Playgroud)