async{} 内部流程

And*_*d14 7 android kotlin kotlin-coroutines kotlin-flow

我可以在 kotlin flow 中使用 async{} 吗?

场景:API 调用后,我得到了需要解析的 200 个对象的列表(转换为 UIObject)。我正在尝试并行处理这个列表。下面是伪代码:

 fun getUIObjectListFlow():Flow<List<UIObject>> {
    flow<List<UIObject>> {
        while (stream.hasNext()) {
            val data = stream.getData() // reading from an input stream. Data comes in chunk

            val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
            val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
            val processedList = firstHalfDeffered.await() + secondHalfDeffered.await() // just pseudo code

            emit(processedList)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

由于 async{} 需要 Coroutine 作用域(例如: someScope.async{} ),如何获取 flow 内的作用域?还有其他方法可以实现此目的吗?

该函数位于存储库中,我从视图模型中调用它。

谢谢

Jof*_*rey 12

(最初问题的原始答案)

Flow<T>正如 @broot 在评论中提到的,如果您想要生成单个项目(即使该单个项目是一个集合),则不需要 a 。一般来说,您只需要一个suspend函数(或者在本例中是一段挂起的代码)而不是返回Flow.

现在,无论您是否保留单项流,都可以使用coroutineScope { ... }挂起函数来定义可以启动协程的本地范围。这个函数做了一些事情:

  1. 它提供了启动子协程的范围
  2. 它会暂停直到所有子协程完成
  3. 它返回一个基于块中最后一个表达式的值(lambda 的“返回”值)

它可能是这样的:

val uiObjects = coroutineScope { //this: CoroutineScope
    val list = getDataFromServer()
            
    val firstHalf = async(Dispatchers.IO) { /*process first half of the list */ }
    val secondHalf = async(Dispatchers.IO) { /*process second half of the list */ }
            
    // the last expression from the block is what the uiObjects variable gets
    firstHalf.await() + secondHalf.await()
}
Run Code Online (Sandbox Code Playgroud)

编辑:鉴于问题更新,这里是一些更新的代码。您仍然应该使用coroutineScope为短期协程创建本地范围:

fun getUIObjectListFlow(): Flow<List<UIObject>> = flow<List<UIObject>> {
    while (stream.hasNext()) {
        val data = stream.getData() // reading from an input stream. Data comes in chunk

        val processedList = coroutineScope {
            val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
            val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
            firstHalfDeffered.await() + secondHalfDeffered.await() 
        }
        emit(processedList)
    }
}
Run Code Online (Sandbox Code Playgroud)