Kotlin 中哪些用例适合 Dispatchers.Default?

ali*_*ari 5 multithreading threadpool kotlin kotlin-coroutines

根据文档,线程池大小IODefault调度程序的行为如下:

  • Dispatchers.Default:默认情况下,该调度程序使用的最大并行级别等于 CPU 核心数,但至少为两个。
  • Dispatchers.IO:默认为64个线程或核心数的限制(以较大者为准)。

除非我遗漏了一条信息,否则执行大量 CPU 密集型工作Default会更高效(更快),因为上下文切换发生的频率会降低

但下面的代码实际上在 上运行得更快Dispatchers.IO

fun blockingWork() {
    val startTime = System.currentTimeMillis()
    while (true) {
        Random(System.currentTimeMillis()).nextDouble()
        if (System.currentTimeMillis() - startTime > 1000) {
            return
        }
    }
}

fun main() = runBlocking {
    val startTime = System.nanoTime()
    val jobs = (1..24).map { i ->
        launch(Dispatchers.IO) { // <-- Select dispatcher here
            println("Start #$i in ${Thread.currentThread().name}")
            blockingWork()
            println("Finish #$i in ${Thread.currentThread().name}")
        }
    }
    jobs.forEach { it.join() }
    println("Finished in ${Duration.of(System.nanoTime() - startTime, ChronoUnit.NANOS)}")
}
Run Code Online (Sandbox Code Playgroud)

我在 8 核 CPU 上运行 24 个作业(因此,我可以让调度程序的所有线程保持Default忙碌)。这是我机器上的结果:

Dispatchers.IO --> Finished in PT1.310262657S
Dispatchers.Default --> Finished in PT3.052800858S
Run Code Online (Sandbox Code Playgroud)

你能告诉我我在这里缺少什么吗?如果效果更好,为什么我应该使用除(或任何具有大量线程的线程池)IO之外的任何调度程序。IO

Ale*_*nek 3

回答您的问题:调度程序最适合不具有阻塞功能的任务,因为在并发执行此类工作负载时(并发和并行执行之间的差异Default) ,超过最大并行度不会带来任何好处。

https://www.cs.uic.edu/~jbell/CourseNotes/OperatingSystems/5_CPU_Scheduling.html


你的实验有缺陷。正如评论中已经提到的,您的blockingWork不受 CPU 限制,而是 IO 限制。这一切都与等待有关——任务被阻塞并且 CPU 无法执行其后续指令的时期。你的blockingWork本质只是“等待 1000 毫秒”,并行等待 1000 毫秒 X 次将比顺序执行更快。您执行一些计算(生成随机数 - 本质上也可能是 IO 绑定的),但正如已经指出的,您的工作人员会生成或多或少的这些数字,具体取决于底层线程休眠的时间。

我通过生成斐波那契数进行了一些简单的实验(通常用于模拟 CPU 工作负载)。然而,在考虑了 JVM 中的 JIT 之后,我无法轻松得出任何结果来证明Default调度程序性能更好。上下文切换可能并不像人们想象的那么重要。可能是调度程序没有为我的工作负载使用 IO 调度程序创建更多线程。可能我的实验也有缺陷。不能确定 - JVM 基准测试本身并不简单,添加协程(及其线程池)肯定不会让它变得更简单。

然而,我认为这里有更重要的事情需要考虑,那就是阻塞Default调度员对阻塞呼叫更加敏感。由于池中的线程较少,因此所有线程更有可能被阻塞,并且当时没有其他协程可以执行。

您的程序正在线程中运行。如果所有线程都被阻塞,那么您的程序就不会执行任何操作。创建新线程的成本很高(主要是内存方面的),因此对于具有阻塞功能的高负载系统来说,这成为一个限制因素。Kotlin 在引入“挂起”功能方面做得非常出色。程序的并发性不再局限于线程的数量。如果一个流需要等待,它只是挂起而不是阻塞线程。然而,“世界并不完美”,并不是所有的事情都“挂起”——仍然存在“阻塞”调用——你有多确定你使用的库没有在幕后执行此类调用?拥有权利的同时也被赋予了重大的责任。对于协程,需要更加小心死锁,尤其是在使用Default调度程序时。事实上,在我看来,IODispatcher 应该是默认的。


编辑

TL;DR:您可能实际上想创建自己的调度程序。

回想起来,我发现我的回答有些肤浅。从技术上讲,仅通过查看要运行的工作负载类型来决定使用哪个调度程序是不正确的。将 CPU 密集型工作负载限制到与 CPU 核心数量相匹配的调度程序确实可以优化吞吐量,但这并不是唯一的性能指标。

事实上,通过仅对Default所有 CPU 密集型工作负载使用,您可能会发现您的应用程序变得无响应!例如,假设我们有一个使用Default调度程序的“CPU 密集型”长时间运行的后台进程。现在,如果该进程使调度程序的线程池饱和Default,那么您可能会发现启动处理即时用户操作(用户单击或客户端请求)的协程需要等待后台进程首先完成!您已经实现了巨大的 CPU 吞吐量,但代价是延迟,并且应用程序的整体性能实际上下降了。

Kotlin 不会强制您使用预定义的调度程序。您始终可以为协程的特定任务创建定制的调度程序。

最终是关于:

  1. 平衡资源。您实际需要多少个线程?您有能力创建多少个线程?是 CPU 密集型还是 IO 密集型?即使它受 CPU 限制,您确定要将所有 CPU 资源分配给您的工作负载吗?
  2. 分配优先级。了解调度程序上运行的工作负载类型。也许某些工作负载需要立即运行,而另一些工作负载可能需要等待?
  3. 防止饥饿僵局。确保当前运行的协程不会阻塞等待同一调度程序中等待空闲线程的协程的结果。