Ada*_*dam 3 multithreading asynchronous sequence kotlin kotlin-coroutines
我有一个延迟生成的大序列,并且太大而无法放入内存中。
我想使用协程来处理这个序列以提高性能,在本例中我使用 10 个并行线程进行处理。
runBlocking(Dispatchers.IO.limitedParallelism(10)) {
massiveLazySequenceOfThings.forEach { myThingToProcess ->
print("I am processing $myThingToProcess")
launch() {
process(myThingToProcess)
}
}
}
Run Code Online (Sandbox Code Playgroud)
这里的问题是,第一个打印语句将对序列中的每个项目执行,因此对于像我这样的非常大的序列,这将导致 OOM。
在这个例子中,有没有办法让我的序列的迭代变得“惰性”,以便在任何时候只处理固定的数字?
我是否被迫在此处使用通道(可能使用缓冲通道?)在序列迭代期间强制执行阻塞调用,直到处理某些项目?或者我还缺少其他一些更清洁的解决方案吗?
在我的实际示例中,我还使用supervisorScope来监视每个处理作业,因此如果可能的话我也想保留它。
您可以使用 限制并行操作的数量Semaphore
。
runBlocking(Dispatchers.IO) {
val semaphore = Semaphore(permits = 10)
massiveLazySequenceOfThings.forEach { myThingToProcess ->
semaphore.acquire()
print("I am processing $myThingToProcess")
launch {
try {
process(myThingToProcess)
} finally {
print("I am done processing $myThingToProcess")
semaphore.release()
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
525 次 |
最近记录: |