Kotlin协程-在运行阻塞中使用主线程

Wit*_*upś 5 kotlin kotlin-coroutines

我正在尝试执行以下代码:

 val jobs = listOf(...)
 return runBlocking(CommonPool) {
    val executed = jobs.map {
        async { it.execute() }
    }.toTypedArray()
    awaitAll(*executed)
 }
Run Code Online (Sandbox Code Playgroud)

jobs某些Suppliers 的列表在哪里-在同步世界中,这应该只创建例如int列表。一切正常,但问题是未使用主线程。YourKit的Bellow屏幕截图: 在此处输入图片说明

所以,问题是-我又如何利用主线程?

我想runBlocking这里是问题,但是还有其他方法可以收到相同的结果吗?使用Java并行流,它看起来要好得多,但是主线程仍未完全利用(任务是完全独立的)。

更新

好吧,也许我告诉你的东西太少了。在观看Vankant Subramaniam的演讲后不久,我的问题就来了:https ://youtu.be/0hQvWIdwnw4 。我需要最高的性能,没有IO,没有Ui等。只有计算。只有请求,我需要使用所有可用资源。

我的一个想法是将Paralleizm设置为线程数+ 1,但是我认为这很愚蠢。

Mar*_*nik 2

我使用 Java 8 并行流测试了该解决方案:

jobs.parallelStream().forEach { it.execute() }
Run Code Online (Sandbox Code Playgroud)

我发现 CPU 利用率可靠地达到 100%。作为参考,我使用了这个计算作业:

class MyJob {
    fun execute(): Double {
        val rnd = ThreadLocalRandom.current()
        var d = 1.0
        (1..rnd.nextInt(1_000_000)).forEach { _ ->
            d *= 1 + rnd.nextDouble(0.0000001)
        }
        return d
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,其持续时间从零到执行 100,000,000 次 FP 乘法所需的时间随机变化。

出于好奇,我还研究了您添加到问题中的代码作为适合您的解决方案。我发现它存在很多问题,例如:

  • 将所有结果累积到一个列表中,而不是在它们可用时对其进行处理
  • 提交最后一个作业后立即关闭结果通道,而不是等待所有结果

我自己编写了一些代码,并添加了代码来对 Stream API 单行进行基准测试。这里是:

const val NUM_JOBS = 1000
val jobs = (0 until NUM_JOBS).map { MyJob() }


fun parallelStream(): Double =
        jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })

fun channels(): Double {
    val resultChannel = Channel<Double>(UNLIMITED)

    val mainComputeChannel = Channel<MyJob>()
    val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
        GlobalScope.actor<MyJob>(Dispatchers.Default) {
            for (job in channel) {
                job.execute().also { resultChannel.send(it) }
            }
        }
    }
    val allComputeChannels = poolComputeChannels + mainComputeChannel

    // Launch a coroutine that submits the jobs
    GlobalScope.launch {
        jobs.forEach { job ->
            select {
                allComputeChannels.forEach { chan ->
                    chan.onSend(job) {}
                }
            }
        }
    }

    // Run the main loop which takes turns between running a job
    // submitted to the main thread channel and receiving a result
    return runBlocking {
        var completedCount = 0
        var sum = 0.0
        while (completedCount < NUM_JOBS) {
            select<Unit> {
                mainComputeChannel.onReceive { job ->
                    job.execute().also { resultChannel.send(it) }
                }
                resultChannel.onReceive { result ->
                    sum += result
                    completedCount++
                }
            }
        }
        sum
    }
}

fun main(args: Array<String>) {
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
}

fun measure(task: String, measuredCode: () -> Double) {
    val block = { print(measuredCode().toString().substringBefore('.')) }
    println("Warming up $task")
    (1..20).forEach { _ -> block() }
    println("\nMeasuring $task")
    val average = (1..20).map { measureTimeMillis(block) }.average()
    println("\n$task took $average ms")
}
Run Code Online (Sandbox Code Playgroud)

这是我的典型结果:

Parallel Stream took 396.85 ms
Channels took 398.1 ms
Run Code Online (Sandbox Code Playgroud)

结果相似,但一行代码仍然胜过 50 行代码:)