Kotlin:如何异步等待相同方法的列表?

Ben*_*n H 5 kotlin

我有几百个 Java 类实例,它们都需要完成它们的 .calculate() 方法或在 10 分钟内死亡。它们会占用 CPU 和内存,所以我只想一次只允许 5 个(线程?)。我相信我很接近,但是来自 Java 背景我还不够熟悉 kotlin 协程(vs java ExecutorServices)来进行编译。

// ...my logic to create a stream of identical class type instances 
// that all have a vanilla blocking .calculate():Double method...
// which I now want to (maybe?) map to Jobs

listOf(MyClass(1), MyClass(2), MyClass(1000))
.map {
  launch(CommonPool) {
    val errorRate: Double? = it?.calculate()
    println("${it?.javaClass?.simpleName}  $errorRate") // desired output
    errorRate
  }
}
.collect(Collectors.toList<Job>())

jobs.forEach {
    println(it.join())
}
Run Code Online (Sandbox Code Playgroud)

然后我想我需要用非阻塞计算来包装计算?还是阻塞,但超时限制?那个“runBlocking”应该在那里吗?在上面的代码中作为 lambda 更好吗?

fun MyClass.calculateTimeLimited(): Double = runBlocking {
  withTimeout(TIMEOUT) {
    this.calculate() // <-- doesn't compile! "this" is "CoroutineScope"
Run Code Online (Sandbox Code Playgroud)

vod*_*dan 5

我的看法(抱歉无法在我的机器上测试):

val results = streamOfInstances.asSeqence().map {
    async(CommonPool) {
        val errorRate: Double? = it?.calculate()
        println("${it?.javaClass?.simpleName}  $errorRate")
        errorRate
    }
}

runBlocking {
    results.forEach {
        println(it.await())
    }
}  
Run Code Online (Sandbox Code Playgroud)

与您的代码的主要区别:

  • 我使用async而不是launch因为我正在收集结果
  • 阻塞操作(join()await())在里面runBlocking{}
  • 我使用 Kotlin 的 API map,而不是 JDK8 API


tot*_*to2 4

我不知道你是否知道,但有一个很棒的文档:coroutines by example。我链接了有关取消和超时的具体部分。以下是我对您的问题的实现:

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.newSingleThreadContext
import java.util.*
import kotlin.system.measureNanoTime

internal val random = Random()

fun createRandArray(n: Long): Array<Int> {
    return createRandTArray(n).toTypedArray()
}

fun createRandTArray(n: Long): IntArray {
    return random.ints(n, 0, n.toInt()).toArray()
}

var size: Long = 1_000_000
var nRepeats: Int = 11
var nDrops: Int = 1

fun <T> benchmark(name: String,
                  buildSortable: (Long) -> T,
                  sort: (T) -> Any) {
    val arrays = List(nRepeats) { buildSortable(size) }
    val timesMS = arrays.map { measureNanoTime { sort(it) } / 1_000_000 }.drop(nDrops) // for JVM optimization warmup
    // println(timesMS)
    println("[$name] Average sort time for array of size $size: ${timesMS.average() } ms")
}

fun main(args: Array<String>) {
    size = 1_000_000
    nRepeats = 6

    benchmark("Array<Int>",
            buildSortable = { size -> createRandTArray(size).toTypedArray() },
            sort = ::mergeSort)

    benchmark("ListLike Array<Int>",
            buildSortable = { size -> SortingArray(createRandTArray(size).toTypedArray()) },
            sort = { array -> mergeSortLL(array) })  // oddly ::mergeSortLL is refused

    benchmark("ListLike IntArray",
            buildSortable = { size -> createRandTArray(size).asComparableList() },
            sort = { array -> mergeSortLL(array) })  // oddly ::mergeSortLL is refused

    benchmark("IntArray",
            buildSortable = { size -> createRandTArray(size) },
            sort = ::mergeSortIA)

    benchmark("IntArray multi-thread (CommonPool) with many array copies",
            buildSortable = { size -> createRandTArray(size) },
            sort = { mergeSortCorou(it) })

    val origContext = corouContext
    corouContext = newSingleThreadContext("single thread")
    benchmark("IntArray multi-thread (one thread!) with many array copies",
            buildSortable = { size -> createRandTArray(size) },
            sort = { mergeSortCorou(it) })
    corouContext = origContext

    benchmark("Java int[]",
            buildSortable = { size -> createRandTArray(size) },
            sort = { MergeSorter.sort(it) })
}
Run Code Online (Sandbox Code Playgroud)

我得到了输出:

 Hit the timeout (CancellationException).
 Out of 100 computations, 45 completed.
Run Code Online (Sandbox Code Playgroud)

您可以使用该timeOut值(当前为 500 毫秒)。每个作业都有不同的随机执行时间(从 0 到 1000 毫秒),因此大约一半会在超时之前执行。

但是,您可能会更难针对您的特定问题实施此操作。您的计算必须是可取消的。仔细阅读我上面链接的文档中有关取消的部分。基本上,您的计算要么必须调用suspend中的函数之一kotlinx.coroutines(这就是我调用 后所做的事情delay),要么使用yieldor isActive


编辑:与取消任何工作的评论相关(不可取消/不可暂停):

不,这里没有魔法。无论您使用什么框架,使计算真正可取消都是非常困难的。众所周知,Java 有Thread.stop(),它似乎可以实现您想要的功能,但已被弃用

我尝试使用协程来解决超时后停止提交新作业的更简单问题,但是在超时之前启动的作业可以运行远远超过超时时间而不会被取消/中断。我花了一些时间,但找不到一个简单的协程解决方案。这可以使用标准 Java 并发结构来完成。