Par*_*ker 3 collections parallel-processing kotlin kotlinx.coroutines
我有一组对象,我需要对它进行一些转换.目前我正在使用:
var myObjects: List<MyObject> = getMyObjects()
myObjects.forEach{ myObj ->
someMethod(myObj)
}
Run Code Online (Sandbox Code Playgroud)
它工作正常,但我希望通过someMethod()并行运行来加速它,而不是等待每个对象完成,然后再开始下一个.
在Kotlin有什么办法吗?也许还有doAsyncTask什么?
我知道一年前问这个问题的时候是不可能的,但是现在Kotlin已经像doAsyncTask我一样好奇,如果有任何协同程序可以提供帮助的话
Ale*_*uss 11
是的,这可以使用协程来完成.以下函数在集合的所有元素上并行应用操作:
fun <A>Collection<A>.forEachParallel(f: suspend (A) -> Unit): Unit = runBlocking {
map { async(CommonPool) { f(it) } }.forEach { it.await() }
}
Run Code Online (Sandbox Code Playgroud)
虽然定义本身有点神秘,但您可以按照预期轻松应用它:
myObjects.forEachParallel { myObj ->
someMethod(myObj)
}
Run Code Online (Sandbox Code Playgroud)
并行映射可以以类似的方式实现,请参阅/sf/answers/3205584371/.
Java Stream 在 Kotlin 中使用起来很简单:
tasks.stream().parallel().forEach { computeNotSuspend(it) }
Run Code Online (Sandbox Code Playgroud)
但是,如果您使用的是 Android,并且您希望应用程序与低于 24 的 API 兼容,则不能使用 Java 8。
您也可以按照您的建议使用协程。但截至目前(2017 年 8 月),它实际上并不是语言的一部分,您需要安装一个外部库。有很好的例子指南。
runBlocking<Unit> {
val deferreds = tasks.map { async(CommonPool) { compute(it) } }
deferreds.forEach { it.await() }
}
Run Code Online (Sandbox Code Playgroud)
请注意,协程是使用非阻塞多线程实现的,这意味着它们可以比传统的多线程更快。我在下面的代码中对 Stream 并行与协程进行了基准测试,在这种情况下,协程方法在我的机器上快了 7 倍。 但是,您必须自己做一些工作以确保您的代码“挂起”(非锁定),这可能非常棘手。 在我的例子我打电话delay这是一个suspend由图书馆提供的功能。非阻塞多线程并不总是比传统多线程快。如果您有许多线程只等待 IO 什么都不做,它会更快,这就是我的基准测试正在做的事情。
我的基准测试代码:
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.*
import kotlin.system.measureNanoTime
import kotlin.system.measureTimeMillis
class SomeTask() {
val durationMS = random.nextInt(1000).toLong()
companion object {
val random = Random()
}
}
suspend fun compute(task: SomeTask): Unit {
delay(task.durationMS)
//println("done ${task.durationMS}")
return
}
fun computeNotSuspend(task: SomeTask): Unit {
Thread.sleep(task.durationMS)
//println("done ${task.durationMS}")
return
}
fun main(args: Array<String>) {
val n = 100
val tasks = List(n) { SomeTask() }
val timeCoroutine = measureNanoTime {
runBlocking<Unit> {
val deferreds = tasks.map { async(CommonPool) { compute(it) } }
deferreds.forEach { it.await() }
}
}
println("Coroutine ${timeCoroutine / 1_000_000} ms")
val timePar = measureNanoTime {
tasks.stream().parallel().forEach { computeNotSuspend(it) }
}
println("Stream parallel ${timePar / 1_000_000} ms")
}
Run Code Online (Sandbox Code Playgroud)
我的 4 核计算机上的输出:
Coroutine: 1037 ms
Stream parallel: 7150 ms
Run Code Online (Sandbox Code Playgroud)
如果取消注释println这两个compute函数中的 ,您将看到在非阻塞协程代码中,任务以正确的顺序处理,但不是使用 Streams。
| 归档时间: |
|
| 查看次数: |
3574 次 |
| 最近记录: |