Ahm*_*mal 3 kotlin kotlin-coroutines
使用Rx
一个可以合并多个订阅源,如下所示
// psudo data repository
fun getAllData(): Flowable<DataType> {
return getCachedData().mergeWith(getRemoteData())
}
fun getCachedData(): Flowable<DataType> {
// local database call
}
fun getRemoteData(): Flowable<DataType> {
// network call
}
Run Code Online (Sandbox Code Playgroud)
上面的代码getAllData()
将在合并后的一个返回后立即返回数据Flowables
,然后在准备好后发送另一个。
问题是,如何使用 Kotlin 协程实现相同的结果produce
?
您可以创建一个组合通道,produce
在其中启动两个协程,使用两个输入通道并将其重新发送到组合通道。
这是一个将多个相同类型的接收通道合并为一个的函数。
/**
* Merges multiple [channels] into one channel.
* All elements of all channels are sent to the combined channel in the order they arrive on the input channels.
*/
fun <T> CoroutineScope.mergeChannels(vararg channels: ReceiveChannel<T>) : ReceiveChannel<T> {
return produce {
channels.forEach {
launch { it.consumeEach { send(it) }}
}
}
}
Run Code Online (Sandbox Code Playgroud)
你可以这样使用它:
fun main() = runBlocking<Unit> {
val every100Ms = produce {
repeat(10) {
send("every 100: $it")
delay(100)
}
}
val every200Ms = produce {
repeat(10) {
send("every 200: $it")
delay(200)
}
}
val combined = mergeChannels(every100Ms, every200Ms)
combined.consumeEach { println(it) }
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1628 次 |
最近记录: |