Kotlin系列的并行操作?

HRJ*_*HRJ 31 parallel-processing kotlin

在Scala中,可以轻松地执行并行映射,forEach等,其中:

collection.par.map(..)
Run Code Online (Sandbox Code Playgroud)

在Kotlin有同等学历吗?

yol*_*ole 38

Kotlin标准库不支持并行操作.但是,由于Kotlin使用标准Java集合类,因此您也可以使用Java 8流API在Kotlin集合上执行并行操作.

例如

myCollection.parallelStream()
        .map { ... }
        .filter { ... }
Run Code Online (Sandbox Code Playgroud)

  • 如何在Kotlin中使用Java 8流API? (2认同)
  • @LordScone就像你在Java中一样.例如:`myCollection.parallelStream().map {...}.过滤{...}` (2认同)

Ale*_*uss 31

从Kotlin 1.1开始,并行操作在协同程序方面也可以表现得非常优雅.这是pmap列表:

fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
    map { async(CommonPool) { f(it) } }.map { it.await() }
}
Run Code Online (Sandbox Code Playgroud)

请注意,协同程序仍然是一个实验性功能.

  • 很优雅吗?相反,我会说代码很难读。 (10认同)
  • 有了Kotlin 1.3,这还是最好的答案吗?我注意到@ OlivierTerrien的Stream回答如下,但我更喜欢坚持使用Kotlin Sequences和Iterables. (3认同)
  • @DzmitryLazerka 我想我知道你来自哪里,但这个确切的代码并不是优雅的一点。这段代码的使用很优雅。如果上面的方法放在某个地方,它可以只与 `foo.pmap { v -&gt; ... }` 一起使用。我认为这是相当优雅的。 (2认同)

小智 13

您可以使用此扩展方法:

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
    map { async { f(it) } }.awaitAll()
}
Run Code Online (Sandbox Code Playgroud)

有关更多信息,请参阅Kotlin 中的 Parallel Map


Hol*_*ndl 12

Kotlin的stdlib还没有官方支持,但您可以定义一个扩展函数来模仿par.map:

fun <T, R> Iterable<T>.pmap(
          numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, 
          exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
          transform: (T) -> R): List<R> {

    // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
    val defaultSize = if (this is Collection<*>) this.size else 10
    val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))

    for (item in this) {
        exec.submit { destination.add(transform(item)) }
    }

    exec.shutdown()
    exec.awaitTermination(1, TimeUnit.DAYS)

    return ArrayList<R>(destination)
}
Run Code Online (Sandbox Code Playgroud)

(github来源)

这是一个简单的用法示例

val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }
Run Code Online (Sandbox Code Playgroud)

如果需要,它允许通过提供线程数甚至特定线程来调整线程java.util.concurrent.Executor.例如

listOf("foo", "bar").pmap(4, transform = { it + "!" })
Run Code Online (Sandbox Code Playgroud)

请注意,这种方法只允许并行化map操作,不会影响任何下游位.例如,filter在第一个示例中将运行单线程.但是,在许多情况下,只是数据转换(即map)需要并行化.此外,将方法从上面扩展到Kotlin集合API的其他元素是很简单的.


Oli*_*ien 9

从1.2版本开始,kotlin添加了一个符合JRE8 的流功能

因此,异步迭代列表可以像下面这样完成:

fun main(args: Array<String>) {
  val c = listOf("toto", "tata", "tutu")
  c.parallelStream().forEach { println(it) }
}
Run Code Online (Sandbox Code Playgroud)


Lam*_*sti 5

Kotlin 希望是惯用的,但又不想过于综合以至于乍一看难以理解。

通过 Coroutines 进行并行计算也不例外。他们希望使用一些预先构建的方法简单但不隐含,允许在需要时分支计算。

在你的情况下:

collection.map { 
        async{ produceWith(it) } 
    }
    .forEach { 
        consume(it.await()) 
    }
Run Code Online (Sandbox Code Playgroud)

请注意,要调用async并且await您需要在所谓的 中Context,您不能从非协程上下文中进行挂起调用或启动协程。要输入一个,您可以:

  • runBlocking { /* your code here */ }:它将挂起当前线程,直到 lambda 返回。
  • GlobalScope.launch { }:它将并行执行 lambda;如果您main完成执行而您的协程还没有发生不好的事情,在这种情况下最好使用runBlocking.

希望它可以帮助:)