Mar*_*nik 5 kotlin kotlinx.coroutines
自然又名。智能批处理是流处理中的一种技术,可在不影响延迟的情况下优化吞吐量。在并发队列的示例中,使用者可以自动消耗所有在某个瞬间观察到的项目,然后将它们作为批处理。理想情况下,应该对队列进行限制,为批处理大小提供上限,并同时向发送方提供背压。
之所以称为“自然”批次,是因为没有强加的批次大小:当流量较低时,它将在到达每个项目时立即对其进行处理。在这种情况下,您不需要通过将项目批处理在一起来进行任何吞吐量优化。当流量增加时,使用者将自动开始处理更大的批次,从而分摊单个操作(如数据库)的固定等待时间INSERT。
我写了一些实现基本目标的代码:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
const val batchLimit = 20
@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
handleItems: (List<T>) -> Unit
) {
val buf = mutableListOf<T>()
while (true) {
receiveOrNull()?.also { buf.add(it) } ?: break
for (x in 2..batchLimit) {
poll()?.also { buf.add(it) } ?: break
}
handleItems(buf)
buf.clear()
}
}
Run Code Online (Sandbox Code Playgroud)
我们可以这样测试:
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
val chan = generateMockTraffic()
runBlocking {
chan.consumeBatched { println("Received items: $it") }
}
}
@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
return GlobalScope.produce(capacity = batchLimit) {
(1..100).forEach {
send(it)
if (it % 10 == 0) {
delay(1)
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
consumeBatched()一次轮询队列中的一项,因此必须另外施加批次限制。如果针对并发队列(如Agrona项目的OneToOneConcurrentArrayQueue)编写并支持该drain操作,那将是最佳选择。
在图书馆的更多支持下,Kotlin渠道是否有更好的方法?
如果没有,是否将其视为要添加的功能?
在库的更多支持下,是否有更好的 Kotlin 通道方法?
该库不支持此功能。
如果没有,这会被视为要添加的功能吗?
这取决于所需的 API 表面。drainmember 不太可能适合通道语义:它限制实现,它应该以某种方式暴露漏极限制,并且它为通道提供更多“类似集合”的 API。drain例如,无限频道应该如何表现?是否可以以高效的方式实现一次drain(使用预先确定大小的缓冲区,但避免 OOM 和无限的集合)并将其与任何通道实现一起使用?
可以改进的是来自通道的额外提示,例如预期容量和排队元素的数量。它们可以通过默认实现拥有宽松的语义,并且可以像提示一样使用drain一些合理的可配置上限进行扩展。以后可以添加这样的API,欢迎创建issue