当 CoroutineScope 被取消时,Coroutine StateFlow 停止发射

Tim*_*eed 9 android kotlin kotlin-coroutines

我有一个StateFlow在我的应用程序的各个部分之间共享的协程。当我cancelCoroutineScope下游收集器的,一个JobCancellationException被传播到StateFlow,并停止对所有当前和未来集电极发射值。

StateFlow

val songsRelay: Flow<List<Song>> by lazy {
    MutableStateFlow<List<Song>?>(null).apply {
        CoroutineScope(Dispatchers.IO)
            .launch { songDataDao.getAll().distinctUntilChanged().collect { value = it } }
    }.filterNotNull()
}
Run Code Online (Sandbox Code Playgroud)

我的代码中的典型“演示者”实现了以下基类:

abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {

    var view: T? = null

    private val job by lazy {
        Job()
    }

    private val coroutineScope by lazy { CoroutineScope( job + Dispatchers.Main) }

    override fun bindView(view: T) {
        this.view = view
    }

    override fun unbindView() {
        job.cancel()
        view = null
    }

    fun launch(block: suspend CoroutineScope.() -> Unit): Job {
        return coroutineScope.launch(block = block)
    }
}
Run Code Online (Sandbox Code Playgroud)

一个BasePresenter实现可能会调用launch{ songsRelay.collect {...} } 当presenter未绑定时,为了防止泄漏,我取消了父作业。任何时候收集 的presentersongsRelay StateFlow是未绑定的,theStateFlow基本上都以 a 终止JobCancellationException,并且没有其他收集器/presenter 可以从中收集值。

我注意到我可以job.cancelChildren()改为调用,这似乎有效(StateFlow不以 完成JobCancellationException)。但是我想知道job如果我不能取消工作本身,那么声明一个 parent 有什么意义。我可以完全删除job,并调用coroutineScope.coroutineContext.cancelChildren()相同的效果。

如果我只是打电话job.cancelChildren(),那够了吗?我觉得不打电话coroutineScope.cancel(),或者job.cancel(),我可能没有正确或完全清理我已经开始的任务。

我也不明白为什么在调用JobCancellationExceptionjob.cancel()会向上传播层次结构。job“父母”不在这里吗?为什么取消它会影响我的StateFlow

Car*_*mer 1

更新:

您确定您的songRelay所有演示者实际上都被取消了吗?我运行了此测试并打印了“歌曲中继完成”,因为onCompletion还捕获了下游异常。然而,在歌曲中继打印“完成”之后,Presenter 2 发出值 2 就好了。如果我取消 Presenter 2,则 Presenter 2 的作业将再次打印“歌曲中继已完成”并出现 JobCancellationException。

我确实发现有趣的是,一个流实例如何为每个订阅的收集器发出一次。我没有意识到关于流量。

    val songsRelay: Flow<Int> by lazy {
        MutableStateFlow<Int?>(null).apply {
            CoroutineScope(Dispatchers.IO)
                    .launch {
                        flow {
                            emit(1)
                            delay(1000)
                            emit(2)
                            delay(1000)
                            emit(3)
                        }.onCompletion {
                            println("Dao completed")
                        }.collect { value = it }
                    }
        }.filterNotNull()
                .onCompletion { cause ->
                    println("Song relay completed: $cause")
                }
    }

    @Test
    fun test() = runBlocking {
        val job = Job()
        val presenterScope1 = CoroutineScope(job + Dispatchers.Unconfined)
        val presenterScope2 = CoroutineScope(Job() + Dispatchers.Unconfined)

        presenterScope1.launch {
            songsRelay.onCompletion { cause ->
                println("Presenter 1 Completed: $cause")
            }.collect {
                println("Presenter 1 emits: $it")
            }
        }

        presenterScope2.launch {
            songsRelay.collect {
                println("Presenter 2 emits: $it")
            }
        }

        presenterScope1.cancel()

        delay(2000)
        println("Done test")
    }
Run Code Online (Sandbox Code Playgroud)

SupervisorJob我认为你需要在 BasePresenter 中使用而不是Job. 一般来说,Job对于整个 Presenter 来说,使用它会是一个错误,因为一个失败的协程将取消 Presenter 中的所有协程。一般不是你想要的。

  • 我刚刚有机会回到这个话题。所以 - 不,所有演示者的流程都没有被取消。我的复制测试是有缺陷的 - 我取消绑定然后重新绑定同一个演示者,并且在第二次绑定时流程停止收集 - 因为已调用“cancel()” - 这是预期的行为。哎哟! (2认同)