如何通过Kotlin渠道实施自然(又称智能)批处理?

Mar*_*nik 5 kotlin kotlinx.coroutines

自然又名。智能批处理是流处理中的一种技术,可在不影响延迟的情况下优化吞吐量。在并发队列的示例中,使用者可以自动消耗所有在某个瞬间观察到的项目,然后将它们作为批处理。理想情况下,应该对队列进行限制,为批处理大小提供上限,并同时向发送方提供背压。

之所以称为“自然”批次,是因为没有强加的批次大小:当流量较低时,它将在到达每个项目时立即对其进行处理。在这种情况下,您不需要通过将项目批处理在一起来进行任何吞吐量优化。当流量增加时,使用者将自动开始处理更大的批次,从而分摊单个操作(如数据库)的固定等待时间INSERT

我写了一些实现基本目标的代码:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

const val batchLimit = 20

@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
        handleItems: (List<T>) -> Unit
) {
    val buf = mutableListOf<T>()
    while (true) {
        receiveOrNull()?.also { buf.add(it) } ?: break
        for (x in 2..batchLimit) {
            poll()?.also { buf.add(it) } ?: break
        }
        handleItems(buf)
        buf.clear()
    }
}
Run Code Online (Sandbox Code Playgroud)

我们可以这样测试:

@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
    val chan = generateMockTraffic()
    runBlocking {
        chan.consumeBatched { println("Received items: $it") }
    }
}

@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
    return GlobalScope.produce(capacity = batchLimit) {
        (1..100).forEach {
            send(it)
            if (it % 10 == 0) {
                delay(1)
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

consumeBatched()一次轮询队列中的一项,因此必须另外施加批次限制。如果针对并发队列(如Agrona项目的OneToOneConcurrentArrayQueue)编写并支持该drain操作,那将是最佳选择。

在图书馆的更多支持下,Kotlin渠道是否有更好的方法?

如果没有,是否将其视为要添加的功能?

qww*_*sad 2

在库的更多支持下,是否有更好的 Kotlin 通道方法?

该库不支持此功能。

如果没有,这会被视为要添加的功能吗?

这取决于所需的 API 表面。drainmember 不太可能适合通道语义:它限制实现,它应该以某种方式暴露漏极限制,并且它为通道提供更多“类似集合”的 API。drain例如,无限频道应该如何表现?是否可以以高效的方式实现一次drain(使用预先确定大小的缓冲区,但避免 OOM 和无限的集合)并将其与任何通道实现一起使用?

可以改进的是来自通道的额外提示,例如预期容量和排队元素的数量。它们可以通过默认实现拥有宽松的语义,并且可以像提示一样使用drain一些合理的可配置上限进行扩展。以后可以添加这样的API,欢迎创建issue