如何正确加入 CoroutineScope 中启动的所有作业

Cru*_*xel 8 kotlin kotlin-coroutines coroutinescope

GlobalScope我正在重构一些目前在基于结构化并发的方法上启动协程的 Kotlin 代码。我需要在 JVM 退出之前加入代码中启动的所有作业。我的类可以分解为以下接口:

interface AsyncTasker {
    fun spawnJob(arg: Long)
    suspend fun joinAll()
}
Run Code Online (Sandbox Code Playgroud)

用法:

fun main(args: Array<String>) {
    val asyncTasker = createAsyncTasker()

    asyncTasker.spawnJob(100)
    asyncTasker.spawnJob(200)
    asyncTasker.spawnJob(300)
    asyncTasker.spawnJob(500)

    // join all jobs as they'd be killed when the JVM exits
    runBlocking {
        asyncTasker.joinAll()
    }
}
Run Code Online (Sandbox Code Playgroud)

基于我的GlobalScope实现如下所示:

class GlobalScopeAsyncTasker : AsyncTasker {
    private val pendingJobs = mutableSetOf<Job>()

    override fun spawnJob(arg: Long) {
        var job: Job? = null
        job = GlobalScope.launch(Dispatchers.IO) {
            someSuspendFun(arg)
            pendingJobs.remove(job)
        }
        pendingJobs.add(job)
    }

    override suspend fun joinAll() {
        // iterate over a copy of the set as the
        // jobs remove themselves from the set when we join them
        pendingJobs.toSet().joinAll()
    }
}
Run Code Online (Sandbox Code Playgroud)

显然,这并不理想,因为跟踪每个待处理的作业不是很优雅,并且是旧的基于线程的编码范例的残余。

作为一种更好的方法,我正在创建自己的方法CoroutineScope,用于启动所有子项,提供SupervisorJob.

class StructuredConcurrencyAsyncTasker : AsyncTasker {

    private val parentJob = SupervisorJob()
    private val scope = CoroutineScope(Dispatchers.IO + parentJob)

    override fun spawnJob(arg: Long) {
        scope.launch {
            someSuspendFun(arg)
        }
    }

    override suspend fun joinAll() {
        parentJob.complete() // <-- why is this needed??
        parentJob.join()
    }
}
Run Code Online (Sandbox Code Playgroud)

最初开发此解决方案时,我省略了对 的调用parentJob.complete(),这导致join()无限期挂起。这感觉非常不直观,所以我正在寻找确认/输入这是否是解决此类问题的正确方法。为什么我必须手动执行complete()父作业?有没有更简单的方法来解决这个问题?

带有代码的 Kotlin 游乐场

Cru*_*xel 1

来自以下文档Job#join()

当作业因任何原因完成时,此调用将恢复 [...]

由于我从未将父作业标记为Completed,因此join永远不会返回,即使该作业的所有子作业都是Completed

这是有道理的,因为作业永远无法将状态从Completedback 切换到Active,因此,如果它在Completed所有子作业都为 时自动将状态切换到Completed,则以后不可能添加更多子作业。

感谢罗兰为我指明了正确的方向。