Kha*_*yar 1 collections kotlin kotlin-coroutines
我有一个永无止境的流作为序列。我的目标是根据时间和大小从序列中取出一批。
我的意思是,如果我的序列现在有 2250 条消息,我想发送 3 个批次(1000、1000、250)。
此外,如果直到接下来的 5 分钟,我仍然没有积累 1000 条消息,我无论如何都会将它发送到迄今为止积累的任何内容。
sequence
.chunked(1000)
.map { chunk ->
// do something with chunk
}
Run Code Online (Sandbox Code Playgroud)
我期待的是 .chunked(1000, 300) 之类的东西,当我想每 5 分钟发送一次时,300 是秒。
提前致谢
KotlinSequence是一个同步概念,不应以任何有时间限制的方式使用。如果您询问下一个元素的序列,那么它会阻塞调用者线程,直到它产生下一个元素并且无法取消它。
然而,kotlinx.coroutines库引入的概念Channel是异步世界序列的粗略模拟,其中操作可能需要一些时间才能完成,并且在执行此操作时不会阻塞线程。您可以在本指南中阅读更多内容。
它不提供现成的chunked运算符,但可以直接编写。您可以使用以下代码:
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.selects.*
fun <T> ReceiveChannel<T>.chunked(size: Int, time: Long) =
produce<List<T>>(onCompletion = consumes()) {
while (true) { // this loop goes over each chunk
val chunk = mutableListOf<T>() // current chunk
val ticker = ticker(time) // time-limit for this chunk
try {
whileSelect {
ticker.onReceive {
false // done with chunk when timer ticks, takes priority over received elements
}
this@chunked.onReceive {
chunk += it
chunk.size < size // continue whileSelect if chunk is not full
}
}
} catch (e: ClosedReceiveChannelException) {
return@produce // that is normal exception when the source channel is over -- just stop
} finally {
ticker.cancel() // release ticker (we don't need it anymore as we wait for the first tick only)
if (chunk.isNotEmpty()) send(chunk) // send non-empty chunk on exit from whileSelect
}
}
}
Run Code Online (Sandbox Code Playgroud)
正如您从这段代码中看到的,它嵌入了一些关于在极端情况下该做什么的重要决定。如果定时器超时但当前块仍然是空的,我们该怎么办?此代码开始新的时间间隔并且不发送前一个(空)块。我们是在最后一个元素之后超时时完成当前块,从第一个元素开始测量时间,还是从块的开头开始测量时间?此代码稍后执行。
这段代码是完全顺序的——它的逻辑很容易一步一步地遵循(代码内部没有并发)。可以根据任何项目特定的要求对其进行调整。
| 归档时间: |
|
| 查看次数: |
1018 次 |
| 最近记录: |