Fra*_*Lee 4 kotlin kotlin-coroutines kotlin-coroutines-flow
我正在处理通过回调到达的热事件流。“下游”我想将其拆分为多个流,并对其进行处理。事件都从单个线程(我无法控制,因此我不认为我可以在这里使用协同例程)按顺序到达在这里使用正确的结构?
我可以很容易地创建一个 Flow,使用 callbackFlow 和 sendBlocking,但语义似乎不一致,因为 Flow 并不冷。将流拆分为多个下游流的最佳方法是什么(取决于事件的内容)。还是我应该使用渠道?它与我的来源的“热度”相匹配,但整个下游轮询似乎关闭(在这种基本上是同步的情况下),并且许多方法似乎已被弃用而支持 Flow。
我可以通过使用“一路回调”来完成所有这些,但这会产生比我想要的更紧密的耦合。有任何想法吗?
编辑:
我结束了这个,似乎工作:
fun testFlow() {
runBlocking {
val original = flowOf("aap", "noot", "mies", "wim", "zus","jet","weide","does")
val broadcast = original.broadcastIn(this)
val flow1 = broadcast.openSubscription().receiveAsFlow().filter { it.length == 4 }
val flow2 = broadcast.openSubscription().receiveAsFlow().filter { it.length == 3 }
flow1.collect { it -> println("Four letter: ${it}") }
flow2.collect { it -> println("Three letter: ${it}") }
}
}
Run Code Online (Sandbox Code Playgroud)
Jof*_*rey 14
SharedFlow这个用例很快就会很热,但与此同时,你可以BroadcastChannel在封面下使用。
您可以callbackFlow从基于回调的 API 创建冷流开始(请参阅 Roman Elizarov 的关于它的帖子)。然后使用以下方法使其热起来并分享它:
val original: Flow<String> = TODO("get original flow")
// create an implicit hot BroadcastChannel, shared between collectors
val sharedFlow = original.broadcastIn(scope).asFlow()
// create derived cold flows, which will subscribe (on collect) to the
// same hot source (BroadcastChannel)
val flow1 = sharedFlow.filter { it.length == 4 }
val flow2 = sharedFlow.filter { it.length == 3 }.map { it.toUppercase() }
flow1.collect { it -> println("Four letter: ${it}") }
flow2.collect { it -> println("Three letter: ${it}") }
Run Code Online (Sandbox Code Playgroud)
首先要澄清一下,即使Flows 目前大部分是冷的,也已经有一个 hot StateFlow,很快就会有一个方便的shareoperator和一个 hotSharedFlow来简化这种用例。
在我们等待的过程中,如果您最初有感冒Flow,您目前必须首先创建一个热通道(以及一个向其发送元素的协程),从中我们可以导出共享热源的流。这可以通过以下方式之一轻松完成:
Flow.produceIn(scope)在给定范围内启动一个协程,并为您提供ReceiveChannel(对扇出有用,见下文)Flow.broadcastIn(scope)在给定范围内启动一个协程,并为您提供BroadcastChannel(用于实际共享,见下文)一旦你有了一个热通道,你就可以将它转换成一个流并获得不同的行为:
ReceiveChannel.consumeAsFlow()Flow从热源创建 a ,但它只能collect由单个收集器 -ed(否则抛出)ReceiveChannel.receiveAsFlow()创建一个 multi-collector Flow,但它以扇出方式运行(源通道中的每个元素只发送给一个消费者)BroadcastChannel.asFlow()创建一个多收集器Flow,其中每个收集器获取所有元素(有效共享)。调用在 上collect创建一个新订阅BroadcastChannel,并正确处理取消。StateFlow这不是您的用例,但有时您可能不需要流中的所有值,而是最新的当前状态和状态更新。
这曾经是通过 a 完成的ConflatedBroadcastChannel,但现在您可以使用 aStateFlow来表示这一点(自协程 1.3.6 起):
MutableStateFlowequality)。| 归档时间: |
|
| 查看次数: |
2216 次 |
| 最近记录: |