Kotlin Coroutines-如何阻止等待/加入所有工作?

Ste*_*kks 3 kotlin kotlinx.coroutines

我是Kotlin / Coroutines的新手,所以希望我只是缺少一些东西/无法完全理解如何为我要解决的问题构建代码。

本质上,我要获取一个字符串列表,对于列表中的每个项目,我都希望将其发送到另一种方法来工作(进行网络调用并根据响应返回数据)。(编辑:)我希望所有调用同时启动,并阻塞直到所有调用完成/响应已执行,然后返回包含每个响应信息的新列表。

我可能还不太了解何时使用启动/异步,但是我尝试遵循启动(带有joinAll)和异步(带有await)两者。

fun processData(lstInputs: List<String>): List<response> {

    val lstOfReturnData = mutableListOf<response>()

    runBlocking {
        withContext(Dispatchers.IO) {
            val jobs = List(lstInputs.size) {
                launch {
                    lstOfReturnData.add(networkCallToGetData(lstInputs[it]))
                }
            }
            jobs.joinAll()
        }
    }

    return lstofReturnData
Run Code Online (Sandbox Code Playgroud)

我期望发生的是,如果我lstInputs的大小为120,那么当所有工作加入后,我lstOfReturnData的大小也应该为120。

实际发生的是结果不一致。我将运行一次,然后在最终列表中获得118,再次运行,它是120,再次运行,它是117,依此类推。在该networkCallToGetData()方法中,我正在处理任何异常,至少为每个请求返回一些内容,无论网络通话是否失败。

谁能帮我解释为什么我得到的结果不一致,以及我需要做些什么来确保我进行适当的阻止并继续所有工作,然后再继续?

小智 9

mutableListOf()创建一个不是线程安全的ArrayList。
尝试改用ConcurrentLinkedQueue。
另外,您是否使用Kotlin / Kotlinx.coroutine的稳定版本(而不是旧的实验版本)?在稳定版本中,由于引入了结构化并发,因此无需编写jobs.joinAll anymorelaunch是一个扩展功能runBlocking,它将在的范围内启动新的协程,runBlocking并且runBlocking范围将自动等待所有已启动的作业完成。所以上面的代码可以简化为

    val lstOfReturnData = ConcurrentLinkedQueue<response>()
    runBlocking {
            lstInputs.forEach {
                launch(Dispatches.IO) {
                    lstOfReturnData.add(networkCallToGetData(it))
                }
            }
    }
    return lstOfReturnData 
Run Code Online (Sandbox Code Playgroud)