Kotlin 中的延迟函数如何在不阻塞当前线程的情况下工作?

Pon*_*rai 6 coroutine kotlin

这几天在学习协程,大部分概念都清楚了,就是不明白延迟函数的实现。

延迟函数如何在延迟时间后恢复协程?对于一个简单的程序,只有一个主线程,为了在延迟时间后恢复协程,我假设应该有另一个计时器线程来处理所有延迟的调用并稍后调用它们。这是真的吗?有人能解释一下延迟函数的实现细节吗?

Ani*_*ahu 6

TL; 博士;

当使用 runBlocking 时,延迟在内部包装并在同一线程上运行,当使用任何其他调度程序时,它会挂起并通过事件循环线程恢复延续来恢复。检查下面的长答案以了解内部结构。

长答案:

@Francesc 答案指向正确,但有些抽象,并且仍然没有解释延迟在内部的实际工作原理。

因此,正如他指出延迟函数:

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}
Run Code Online (Sandbox Code Playgroud)

它的作用是“获取挂起函数内的当前延续实例,并在运行 lambda 内的块后挂起当前正在运行的协程”

因此,该行将cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)被执行,然后当前协程被挂起,即释放它所粘连的当前线程。

cont.context.delay指着

internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
Run Code Online (Sandbox Code Playgroud)

也就是说,如果ContinuationInterceptor是 Delay 的实现,则返回,否则使用 DefaultDelay,它是internal actual val DefaultDelay: Delay = DefaultExecutor一个 DefaultExecutor,它是internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {...}EventLoop 的实现,并且有自己的线程可以运行。

注意:ContinuationInterceptor是协程位于 runBlocking 块中的实现Delay,以确保延迟在同一线程上运行,否则不是。检查此代码片段以查看结果。

现在我找不到 runBlocking 创建的 Delay 的实现,因为internal expect fun createEventLoop(): EventLoop这是一个从外部实现的期望函数,而不是由源实现。但其DefaultDelay实现如下

public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    val timeNanos = delayToNanos(timeMillis)
    if (timeNanos < MAX_DELAY_NS) {
        val now = nanoTime()
        DelayedResumeTask(now + timeNanos, continuation).also { task ->
            continuation.disposeOnCancellation(task)
            schedule(now, task)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是如何scheduleResumeAfterDelay实现的,它创建一个DelayedResumeTask带有延迟传递的延续,然后调用schedule(now, task)哪个调用scheduleImpl(now, delayedTask),最后调用delayedTask.scheduleTask(now, delayedQueue, this)传递delayedQueue对象

@Synchronized
fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
    if (_heap === kotlinx.coroutines.DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
    delayed.addLastIf(this) { firstTask ->
        if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
        /**
         * We are about to add new task and we have to make sure that [DelayedTaskQueue]
         * invariant is maintained. The code in this lambda is additionally executed under
         * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
         */
        if (firstTask == null) {
            /**
             * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
             * the current now time even if that means "going backwards in time". This makes the structure
             * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
             * are removed from the delayed queue for execution.
             */
            delayed.timeNow = now
        } else {
            /**
             * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
             * and only goes forward in time. We cannot let it go backwards in time or invariant can be
             * violated for tasks that were already scheduled.
             */
            val firstTime = firstTask.nanoTime
            // compute min(now, firstTime) using a wrap-safe check
            val minTime = if (firstTime - now >= 0) now else firstTime
            // update timeNow only when going forward in time
            if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
        }
        /**
         * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
         * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
         * function can be called to reschedule from one queue to another and this might be another reason
         * where new task's time might now violate invariant.
         * We correct invariant violation (if any) by simply changing this task's time to now.
         */
        if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
        true
    }
    return SCHEDULE_OK
}
Run Code Online (Sandbox Code Playgroud)

它最终将任务设置为DelayedTaskQueue当前时间。

// Inside DefaultExecutor
override fun run() {
    ThreadLocalEventLoop.setEventLoop(this)
    registerTimeLoopThread()
    try {
        var shutdownNanos = Long.MAX_VALUE
        if (!DefaultExecutor.notifyStartup()) return
        while (true) {
            Thread.interrupted() // just reset interruption flag
            var parkNanos = DefaultExecutor.processNextEvent() /* Notice here, it calls the processNextEvent */
            if (parkNanos == Long.MAX_VALUE) {
                // nothing to do, initialize shutdown timeout
                if (shutdownNanos == Long.MAX_VALUE) {
                    val now = nanoTime()
                    if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + DefaultExecutor.KEEP_ALIVE_NANOS
                    val tillShutdown = shutdownNanos - now
                    if (tillShutdown <= 0) return // shut thread down
                    parkNanos = parkNanos.coerceAtMost(tillShutdown)
                } else
                    parkNanos = parkNanos.coerceAtMost(DefaultExecutor.KEEP_ALIVE_NANOS) // limit wait time anyway
            }
            if (parkNanos > 0) {
                // check if shutdown was requested and bail out in this case
                if (DefaultExecutor.isShutdownRequested) return
                parkNanos(this, parkNanos)
            }
        }
    } finally {
        DefaultExecutor._thread = null // this thread is dead
        DefaultExecutor.acknowledgeShutdownIfNeeded()
        unregisterTimeLoopThread()
        // recheck if queues are empty after _thread reference was set to null (!!!)
        if (!DefaultExecutor.isEmpty) DefaultExecutor.thread // recreate thread if it is needed
    }
}
Run Code Online (Sandbox Code Playgroud)
// Called by run inside the run of DefaultExecutor
override fun processNextEvent(): Long {
    // unconfined events take priority
    if (processUnconfinedEvent()) return nextTime
    // queue all delayed tasks that are due to be executed
    val delayed = _delayed.value
    if (delayed != null && !delayed.isEmpty) {
        val now = nanoTime()
        while (true) {
            // make sure that moving from delayed to queue removes from delayed only after it is added to queue
            // to make sure that 'isEmpty' and `nextTime` that check both of them
            // do not transiently report that both delayed and queue are empty during move
            delayed.removeFirstIf {
                if (it.timeToExecute(now)) {
                    enqueueImpl(it)
                } else
                    false
            } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
        }
    }
    // then process one event from queue
    dequeue()?.run()
    return nextTime
}
Run Code Online (Sandbox Code Playgroud)

然后,finally 的事件循环(run 函数)internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {...}通过​​使任务出队并在Continuation达到延迟时间时通过调用延迟来恢复实际暂停的任务来处理任务。


Fra*_*esc 2

所有挂起函数的工作方式都相同,编译时它会转换为带有回调的状态机。

当你调用时,delay会发生这样的情况:一条消息以一定的延迟发布到队列上,类似于Handler().postDelayed(delay),当延迟过去后,它会回调到挂起点并恢复执行。

您可以检查该delay函数的源代码以了解其工作原理:

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}
Run Code Online (Sandbox Code Playgroud)

因此,如果延迟为正,则会在延迟时间内安排回调。