将多个 Kotlin 流合并到一个列表中,无需等待第一个值

Mar*_*say 9 kotlin kotlin-coroutines kotlin-flow

我有一个List<Flow<T>>,想生成一个Flow<List<T>>. 这几乎是什么combine- 除了组合等待每个都Flow发出初始值,这不是我想要的。以这段代码为例:

val a = flow {
  repeat(3) {
    emit("a$it")
    delay(100)
  }
}
val b = flow {
  repeat(3) {
    delay(150)
    emit("b$it")
  }
}
val c = flow {
  delay(400)
  emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
  combine(flows) {
    it.toList()
  }.collect { println(it) }
}
Run Code Online (Sandbox Code Playgroud)

使用combine(因此按原样),这是输出:

[a2, b1, c]
[a2, b2, c]
Run Code Online (Sandbox Code Playgroud)

而我也对所有中间步骤感兴趣。这就是我想要从这三个流程中得到的:

[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Run Code Online (Sandbox Code Playgroud)

现在我有两个变通方法,但它们都不是很好......第一个很丑陋并且不适用于可空类型:

[a2, b1, c]
[a2, b2, c]
Run Code Online (Sandbox Code Playgroud)

通过强制所有流发出第一个不相关的值,combine确实调用了转换器,并让我删除我知道不是实际值的空值。迭代那个,更易读但更重:

[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Run Code Online (Sandbox Code Playgroud)

现在这个工作得很好,但仍然感觉我做得太过分了。协程库中是否有我缺少的方法?

Wil*_*zel 8

这个怎么样:

inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
    val array= Array(flows.size) {
        false to (null as T?) // first element stands for "present"
    }

    flows.forEachIndexed { index, flow ->
        launch {
            flow.collect { emittedElement ->
                array[index] = true to emittedElement
                send(array.filter { it.first }.map { it.second })
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

它解决了几个问题:

  • 无需引入新类型
  • [] 不在结果流中
  • 从调用站点中抽象出空处理(或无论如何解决),生成的 Flow 会自行处理

因此,您不会注意到任何特定于实现的解决方法,因为您不必在收集期间处理它:

runBlocking {
    instantCombine(a, b, c).collect {
        println(it)
    }
}
Run Code Online (Sandbox Code Playgroud)

输出:

[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

在这里试试吧!

编辑:更新了处理也发出空值的流的答案。


* 使用的低级数组是线程安全的。就好像您在处理单个变量一样。