Mar*_*say 9 kotlin kotlin-coroutines kotlin-flow
我有一个List<Flow<T>>,想生成一个Flow<List<T>>. 这几乎是什么combine- 除了组合等待每个都Flow发出初始值,这不是我想要的。以这段代码为例:
val a = flow {
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
combine(flows) {
it.toList()
}.collect { println(it) }
}
Run Code Online (Sandbox Code Playgroud)
使用combine(因此按原样),这是输出:
[a2, b1, c]
[a2, b2, c]
Run Code Online (Sandbox Code Playgroud)
而我也对所有中间步骤感兴趣。这就是我想要从这三个流程中得到的:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Run Code Online (Sandbox Code Playgroud)
现在我有两个变通方法,但它们都不是很好......第一个很丑陋并且不适用于可空类型:
[a2, b1, c]
[a2, b2, c]
Run Code Online (Sandbox Code Playgroud)
通过强制所有流发出第一个不相关的值,combine确实调用了转换器,并让我删除我知道不是实际值的空值。迭代那个,更易读但更重:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Run Code Online (Sandbox Code Playgroud)
现在这个工作得很好,但仍然感觉我做得太过分了。协程库中是否有我缺少的方法?
这个怎么样:
inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
val array= Array(flows.size) {
false to (null as T?) // first element stands for "present"
}
flows.forEachIndexed { index, flow ->
launch {
flow.collect { emittedElement ->
array[index] = true to emittedElement
send(array.filter { it.first }.map { it.second })
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
它解决了几个问题:
[] 不在结果流中因此,您不会注意到任何特定于实现的解决方法,因为您不必在收集期间处理它:
runBlocking {
instantCombine(a, b, c).collect {
println(it)
}
}
Run Code Online (Sandbox Code Playgroud)
输出:
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
编辑:更新了处理也发出空值的流的答案。
* 使用的低级数组是线程安全的。就好像您在处理单个变量一样。
| 归档时间: |
|
| 查看次数: |
6900 次 |
| 最近记录: |