如何有效地跨多个协程实现速率限制?

usb*_*102 5 kotlin kotlinx.coroutines

因此,假设我有一堆 couroutine 正在运行,它们与某些 Web 服务进行交互,并且由于我不想发送垃圾邮件,因此我想将请求限制为每 x 秒最多 1 个请求。为此,我可以使用一些这样的代码:

fun CoroutineScope.rateLimiter(tokens: SendChannel<Unit>, rate: Int) = launch {
    var lastToken = System.currentTimeMillis()
    while (isActive) {
        val currentTime = System.currentTimeMillis()
        if (currentTime - lastToken < rate) {
            delay(currentTime - lastToken)
        }
        tokens.send(Unit)
    }
}

fun CoroutineScope.request(tokens: ReceiveChannel<Unit>) = launch { 
    for (token in tokens) {
        //Do Web request
    }
}
Run Code Online (Sandbox Code Playgroud)

1.) 这样做有效吗?

2.) 这不能扩展到将某些内容限制为 x 字节/秒或我需要从Token Bucket请求 x 个令牌的地方,使用协程实现类似功能的最佳方法是什么?

Jen*_*ens 1

如果您想跳过对作业和渠道的依赖,这些作业和渠道可能会产生比消耗的许可证更多的许可证,然后一旦某个流程开始获取许可证,就会出现一群蜂拥而至的情况,也许这就是您的解决方案。

(这里有一些jvm风格,但可以替换为多平台)


import kotlin.math.max
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

class SimpleRateLimiter(eventsPerSecond: Double) {

    private val mutex = Mutex()

    @Volatile
    private var next: Long = Long.MIN_VALUE
    private val delayNanos: Long = (1_000_000_000L / eventsPerSecond).toLong()

    /**
     * Suspend the current coroutine until it's calculated time of exit
     * from the rate limiter
     */
    suspend fun acquire() {
        val now: Long = System.nanoTime()
        val until = mutex.withLock {
            max(next, now).also {
                next = it + delayNanos
            }
        }
        if (until != now) {
            delay((until - now) / 1_000_000)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

它还伴随着其他的权衡。

  • 当 nanoTime 接近 Long.MAX_VALUE 时的行为肯定已损坏。
  • 没有 maxDelay/超时的暗示
  • 无法抓取多个白蚁
  • 没有 tryAquire 实现

如果您想要一个 IntervalLimiter 允许每 Y 秒发出 X 个请求,然后抛出异常,Resilience4J 中有 RateLimiter 或者如果您想要功能更齐全的东西,我正在开发一个 PR 来创建 RateLimiter 和协程核心项目中的 IntervalLimiter