合并Kotlin流

And*_*ndy 0 kotlin kotlin-coroutines

给定2个或多个相同类型的流,是否存在现有的Kotlin协程函数来合并它们,例如RX合并运算符?

目前我正在考虑:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = channelFlow {
    val flowJobs = flows.map { flow ->
        GlobalScope.launch { flow.collect { send(it) } }
    }
    flowJobs.joinAll()
}
Run Code Online (Sandbox Code Playgroud)

但看起来有些笨拙。

Jac*_*s.S 18

这是现在(撰写本文时协程版本 1.3.5)协程库的一部分。

你像这样使用它:

val flowA = flow { emit(1) } 
val flowB = flow { emit(2) }

merge(flowA, flowB).collect{ println(it) } // Prints two integers
// or:
listOf(flowA, flowB).merge().collect { println(it) } // Prints two integers
Run Code Online (Sandbox Code Playgroud)

您可以在源代码中阅读更多内容


mar*_*ran 5

I'm not too familiar with flows yet, so this might be suboptimal. Anyway, I think you could create a flow of all your input flows, and then use flattenMerge to flatten them into a single flow again. Something like this:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = flowOf(*flows).flattenMerge()
Run Code Online (Sandbox Code Playgroud)

  • 您应该注意,与问题中提供的代码不同,默认限制为同时运行的 16 个流,请参阅 [`flattenMerge`](https://kotlin.github.io/kotlinx.coroutines) 中的 `concurrency` 参数/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html)。 (2认同)