如何使用协程延迟迭代大型(大于内存)Kotlin 序列

Ada*_*dam 3 multithreading asynchronous sequence kotlin kotlin-coroutines

我有一个延迟生成的大序列,并且太大而无法放入内存中。

我想使用协程来处理这个序列以提高性能,在本例中我使用 10 个并行线程进行处理。

            runBlocking(Dispatchers.IO.limitedParallelism(10)) {
                massiveLazySequenceOfThings.forEach { myThingToProcess ->
                    print("I am processing $myThingToProcess")
                    launch() {
                        process(myThingToProcess)
                    }
                }
            }
Run Code Online (Sandbox Code Playgroud)

这里的问题是,第一个打印语句将对序列中的每个项目执行,因此对于像我这样的非常大的序列,这将导致 OOM。

在这个例子中,有没有办法让我的序列的迭代变得“惰性”,以便在任何时候只处理固定的数字?

我是否被迫在此处使用通道(可能使用缓冲通道?)在序列迭代期间强制执行阻塞调用,直到处理某些项目?或者我还缺少其他一些更清洁的解决方案吗?

在我的实际示例中,我还使用supervisorScope来监视每个处理作业,因此如果可能的话我也想保留它。

IR4*_*R42 6

您可以使用 限制并行操作的数量Semaphore

runBlocking(Dispatchers.IO) {
    val semaphore = Semaphore(permits = 10)
    massiveLazySequenceOfThings.forEach { myThingToProcess ->
        semaphore.acquire()
        print("I am processing $myThingToProcess")
        launch {
            try {
                process(myThingToProcess)
            } finally {
                print("I am done processing $myThingToProcess")
                semaphore.release()
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)