Kotlin Process Collection并行?

Par*_*ker 3 collections parallel-processing kotlin kotlinx.coroutines

我有一组对象,我需要对它进行一些转换.目前我正在使用:

var myObjects: List<MyObject> = getMyObjects()

myObjects.forEach{ myObj ->
    someMethod(myObj)
}
Run Code Online (Sandbox Code Playgroud)

它工作正常,但我希望通过someMethod()并行运行来加速它,而不是等待每个对象完成,然后再开始下一个.

在Kotlin有什么办法吗?也许还有doAsyncTask什么?

我知道一年前问这个问题的时候是不可能的,但是现在Kotlin已经像doAsyncTask我一样好奇,如果有任何协同程序可以提供帮助的话

Ale*_*uss 11

是的,这可以使用协程来完成.以下函数在集合的所有元素上并行应用操作:

fun <A>Collection<A>.forEachParallel(f: suspend (A) -> Unit): Unit = runBlocking {
    map { async(CommonPool) { f(it) } }.forEach { it.await() }
}
Run Code Online (Sandbox Code Playgroud)

虽然定义本身有点神秘,但您可以按照预期轻松应用它:

myObjects.forEachParallel { myObj ->
    someMethod(myObj)
}
Run Code Online (Sandbox Code Playgroud)

并行映射可以以类似的方式实现,请参阅/sf/answers/3205584371/.

  • 目前无法访问“CommonPool” - 它位于“kotlinx.coroutines”内部! (4认同)

tot*_*to2 5

Java Stream 在 Kotlin 中使用起来很简单:

tasks.stream().parallel().forEach { computeNotSuspend(it) }
Run Code Online (Sandbox Code Playgroud)

但是,如果您使用的是 Android,并且您希望应用程序与低于 24 的 API 兼容,则不能使用 Java 8。

您也可以按照您的建议使用协程。但截至目前(2017 年 8 月),它实际上并不是语言的一部分,您需要安装一个外部库。有很好的例子指南

    runBlocking<Unit> {
        val deferreds = tasks.map { async(CommonPool) { compute(it) } }
        deferreds.forEach { it.await() }
    }
Run Code Online (Sandbox Code Playgroud)

请注意,协程是使用非阻塞多线程实现的,这意味着它们可以比传统的多线程更快。我在下面的代码中对 Stream 并行与协程进行了基准测试,在这种情况下,协程方法在我的机器上快了 7 倍。 但是,您必须自己做一些工作以确保您的代码“挂起”(非锁定),这可能非常棘手。 在我的例子我打电话delay这是一个suspend由图书馆提供的功能。非阻塞多线程并不总是比传统多线程快。如果您有许多线程只等待 IO 什么都不做,它会更快,这就是我的基准测试正在做的事情。

我的基准测试代码:

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.*
import kotlin.system.measureNanoTime
import kotlin.system.measureTimeMillis

class SomeTask() {
    val durationMS = random.nextInt(1000).toLong()

    companion object {
        val random = Random()
    }
}

suspend fun compute(task: SomeTask): Unit {
    delay(task.durationMS)
    //println("done ${task.durationMS}")
    return
}

fun computeNotSuspend(task: SomeTask): Unit {
    Thread.sleep(task.durationMS)
    //println("done ${task.durationMS}")
    return
}

fun main(args: Array<String>) {
    val n = 100
    val tasks = List(n) { SomeTask() }

    val timeCoroutine = measureNanoTime {
        runBlocking<Unit> {
            val deferreds = tasks.map { async(CommonPool) { compute(it) } }
            deferreds.forEach { it.await() }
        }
    }

    println("Coroutine ${timeCoroutine / 1_000_000} ms")

    val timePar = measureNanoTime {
        tasks.stream().parallel().forEach { computeNotSuspend(it) }
    }
    println("Stream parallel ${timePar / 1_000_000} ms")
}
Run Code Online (Sandbox Code Playgroud)

我的 4 核计算机上的输出:

Coroutine: 1037 ms
Stream parallel: 7150 ms
Run Code Online (Sandbox Code Playgroud)

如果取消注释println这两个compute函数中的 ,您将看到在非阻塞协程代码中,任务以正确的顺序处理,但不是使用 Streams。