如何在 Android 上使用协程进行不间断取消,包括根据生命周期自动取消?

and*_*per 4 android activity-lifecycle android-lifecycle android-asynctask kotlin-coroutines

背景

AsyncTask我在从简单(已弃用)迁移到Android 时Executors遇到问题Kotlin Coroutines

问题

我无法找到如何执行我可以在AsyncTask甚至Executors使用上完成的基本操作Kotlin Coroutines

过去,我可以选择在有或没有线程中断的情况下取消任务。现在,出于某种原因,考虑到我在协程上创建的任务,它只是不间断的,这意味着如果我运行一些甚至在其中“睡眠”的代码(并不总是由我执行),它就不会被中断。

我还记得有人告诉我,协程在 Android 上非常好用,因为如果你在 Activity 中,它会自动取消所有任务。但我找不到如何做到这一点的解释。

我尝试过并发现了什么

对于协程任务(Deferred根据我所看到的调用),我想我已经读到,当我创建它时,我必须选择它将支持哪种取消,并且由于某种原因我不能同时拥有它们。不确定这是否属实,但我仍然想知道,因为我想同时拥有两者以获得最佳迁移。我曾经使用 AsyncTask 将它们添加到一个集合中(并在取消时将其删除),以便在 Activity 完成后,我可以查看所有内容并取消它们。我什至为我开设了一个很好的课程。

这是我为了测试这个而创建的:

class MainActivity : AppCompatActivity() {
    val uiScope = CoroutineScope(Dispatchers.Main)
    val bgDispatcher: CoroutineDispatcher = Dispatchers.IO

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        loadData()
    }

    private fun loadData(): Job = uiScope.launch {
        Log.d("AppLog", "loadData")
        val task = async(bgDispatcher) {
            Log.d("AppLog", "bg start")
            try {
                Thread.sleep(5000L) //this could be any executing of code, including things not editable
            } catch (e: Exception) {
                Log.d("AppLog", "$e")
            }
            Log.d("AppLog", "bg done this.isActive?${this.isActive}")
            return@async 123
        }
        //simulation of cancellation for any reason, sadly without the ability to cancel with interruption
        Handler(mainLooper).postDelayed({
            task.cancel()
        }, 2000L)
        val result: Int = task.await()
        Log.d("AppLog", "got result:$result") // this is called even if you change orientation, which I might not want when done in Activity
    }
}
Run Code Online (Sandbox Code Playgroud)

构建gradle文件:

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1"
Run Code Online (Sandbox Code Playgroud)

问题

  1. 协程上是否可以有一个可以在线程中断和不中断的情况下取消的任务?
  2. 应该添加什么才能使这项工作正常进行,以便当 Activity 终止时(例如方向改变),它会自动取消(可以选择中断或不中断)?我想我可以使用与 AsyncTask 类似的解决方案,但我记得有人告诉我对于协程也有一个很好的方法。

Nic*_*las 7

协程并不神奇。它们是使用状态机实现的,并且可能有多个暂停点。这在原始协程 KEEP中都有解释。

取消发生在这些暂停点。除了挂起点(至少在正常执行中)之外,协程不能在任何其他点取消。如果您使用Thread.sleep,则没有暂停点。您应该使用delay而不是sleep,这会引入一个暂停点。如果您正在进行长时间操作,您可以添加一些yield()来添加暂停点并使您的协程可取消。

来自文档

协程取消是合作性的。协程代码必须配合才能取消。kotlinx.coroutines 中的所有挂起函数都是可以取消的。它们检查协程的取消情况,并在取消时抛出 CancellationException。但是,如果协程正在计算中并且不检查取消,则无法取消它

调用suspend函数会自动引入暂停点。

正如 @CommonsWare 指出的,很少有理由创建你的协程作用域。在活动和片段或与生命周期相关的任何组件中,您应该使用lifecycleScope. 在 ViewModel 中,有viewModelScope.

编辑:

我尝试调整源以在特定条件下runInterruptible中断:传递自定义异常类的实例InterruptionException作为取消原因将跳过线程中断。我已经用 替换了atomicfu 结构AtomicInteger,我假设你的目标只是JVM。您需要通过添加-Xopt-in=kotlinx.coroutines.InternalCoroutinesApi编译器标志来选择加入内部协程 API。

suspend fun <T> runInterruptibleCancellable(
    context: CoroutineContext = EmptyCoroutineContext,
    block: () -> T
): T = withContext(context) {
    try {
        val threadState = ThreadState(coroutineContext.job)
        threadState.setup()
        try {
            block()
        } finally {
            threadState.clearInterrupt()
        }
    } catch (e: InterruptedException) {
        throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
    }
}

private const val WORKING = 0
private const val FINISHED = 1
private const val INTERRUPTING = 2
private const val INTERRUPTED = 3

private class ThreadState(private val job: Job) : CompletionHandler {
    /*
       === States ===

       WORKING: running normally
       FINISH: complete normally
       INTERRUPTING: canceled, going to interrupt this thread
       INTERRUPTED: this thread is interrupted

       === Possible Transitions ===

       +----------------+         register job       +-------------------------+
       |    WORKING     |   cancellation listener    |         WORKING         |
       | (thread, null) | -------------------------> | (thread, cancel handle) |
       +----------------+                            +-------------------------+
               |                                                |   |
               | cancel                                  cancel |   | complete
               |                                                |   |
               V                                                |   |
       +---------------+                                        |   |
       | INTERRUPTING  | <--------------------------------------+   |
       +---------------+                                            |
               |                                                    |
               | interrupt                                          |
               |                                                    |
               V                                                    V
       +---------------+                              +-------------------------+
       |  INTERRUPTED  |                              |         FINISHED        |
       +---------------+                              +-------------------------+
    */
    private val _state = AtomicInteger(WORKING)
    private val targetThread = Thread.currentThread()

    // Registered cancellation handler
    private var cancelHandle: DisposableHandle? = null

    fun setup() {
        cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
        // Either we successfully stored it or it was immediately cancelled
        while (true) {
            when (val state = _state.get()) {
                // Happy-path, move forward
                WORKING -> if (_state.compareAndSet(state, WORKING)) return
                // Immediately cancelled, just continue
                INTERRUPTING, INTERRUPTED -> return
                else -> invalidState(state)
            }
        }
    }

    fun clearInterrupt() {
        /*
         * Do not allow to untriggered interrupt to leak
         */
        while (true) {
            when (val state = _state.get()) {
                WORKING -> if (_state.compareAndSet(state, FINISHED)) {
                    cancelHandle?.dispose()
                    return
                }
                INTERRUPTING -> {
                    /*
                     * Spin, cancellation mechanism is interrupting our thread right now
                     * and we have to wait it and then clear interrupt status
                     */
                }
                INTERRUPTED -> {
                    // Clear it and bail out
                    Thread.interrupted()
                    return
                }
                else -> invalidState(state)
            }
        }
    }

    // Cancellation handler
    override fun invoke(cause: Throwable?) {
        if (cause is InterruptionException) {
            while (true) {
                when (val state = _state.get()) {
                    // Working -> try to transite state and interrupt the thread
                    WORKING -> {
                        if (_state.compareAndSet(state, INTERRUPTING)) {
                            targetThread.interrupt()
                            _state.set(INTERRUPTED)
                            return
                        }
                    }
                    // Finished -- runInterruptible is already complete, INTERRUPTING - ignore
                    FINISHED, INTERRUPTING, INTERRUPTED -> return
                    else -> invalidState(state)
                }
            }
        }
    }

    private fun invalidState(state: Int): Nothing = error("Illegal state $state")
}

class InterruptionException(cause: Throwable?) : CancellationException() {
    init {
        initCause(cause)
    }
}

fun Job.interrupt(cause: Throwable? = null) {
    this.cancel(InterruptionException(cause))
}

suspend fun Job.interruptAndJoin() {
    interrupt()
    return join()
}
Run Code Online (Sandbox Code Playgroud)

您可以使用interruptinterruptAndJoin扩展函数来触发线程中断,否则用于cancel非中断取消。一个例子:

val scope = CoroutineScope(Dispatchers.IO)
val job = scope.launch {
    runInterruptibleCancellable {
        // some blocking code
        Thread.sleep(1000)
        if (!isActive) {
            println("cancelled")
        } else {
            println("completed")
        }
    }
}
job.invokeOnCompletion {
    if (it is InterruptionException) {
        print("interrupted")
    }
}

runBlocking {
//  job.interruptAndJoin()  // prints "interrupted"
//  job.cancelAndJoin()     // prints "cancelled"
    job.join()              // prints "completed"
}
Run Code Online (Sandbox Code Playgroud)

这个例子是我所做的唯一测试。似乎有效。我不知道它是否泄漏,我不知道它是否线程安全。说实话,我真的远远超出了我的专业知识。在没有进一步确认它有效的情况下,请不要在生产中使用它。


ian*_*ake 5

默认情况下,协程不执行线程中断 - 根据使计算代码可取消文档,使用yield()或检查isActive允许协程感知代码参与取消。

但是,当与确实需要线程中断的阻塞代码进行交互时,这正是runInterruptible()的用例,这将导致在取消协程作用域时所包含的代码被线程中断。

这与生命周期感知的协程范围完美配合,当 被Lifecycle销毁时会自动取消:

class MainActivity : AppCompatActivity() {
    val bgDispatcher: CoroutineDispatcher = Dispatchers.IO

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        loadData()
    }

    private fun loadData(): Job = lifecycleScope.launch {
        Log.d("AppLog", "loadData")
        val result = runInterruptible(bgDispatcher) {
            Log.d("AppLog", "bg start")
            try {
                Thread.sleep(5000L) //this could be any executing of code, including things not editable
            } catch (e: Exception) {
                Log.d("AppLog", "$e")
            }
            Log.d("AppLog", "bg done this.isActive?${this.isActive}")
            return@runInterruptible 123
        }
        Log.d("AppLog", "got result:$result")
    }
}
Run Code Online (Sandbox Code Playgroud)