usb*_*102 5 kotlin kotlinx.coroutines
因此,假设我有一堆 couroutine 正在运行,它们与某些 Web 服务进行交互,并且由于我不想发送垃圾邮件,因此我想将请求限制为每 x 秒最多 1 个请求。为此,我可以使用一些这样的代码:
fun CoroutineScope.rateLimiter(tokens: SendChannel<Unit>, rate: Int) = launch {
var lastToken = System.currentTimeMillis()
while (isActive) {
val currentTime = System.currentTimeMillis()
if (currentTime - lastToken < rate) {
delay(currentTime - lastToken)
}
tokens.send(Unit)
}
}
fun CoroutineScope.request(tokens: ReceiveChannel<Unit>) = launch {
for (token in tokens) {
//Do Web request
}
}
Run Code Online (Sandbox Code Playgroud)
1.) 这样做有效吗?
2.) 这不能扩展到将某些内容限制为 x 字节/秒或我需要从Token Bucket请求 x 个令牌的地方,使用协程实现类似功能的最佳方法是什么?
如果您想跳过对作业和渠道的依赖,这些作业和渠道可能会产生比消耗的许可证更多的许可证,然后一旦某个流程开始获取许可证,就会出现一群蜂拥而至的情况,也许这就是您的解决方案。
(这里有一些jvm风格,但可以替换为多平台)
import kotlin.math.max
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
class SimpleRateLimiter(eventsPerSecond: Double) {
private val mutex = Mutex()
@Volatile
private var next: Long = Long.MIN_VALUE
private val delayNanos: Long = (1_000_000_000L / eventsPerSecond).toLong()
/**
* Suspend the current coroutine until it's calculated time of exit
* from the rate limiter
*/
suspend fun acquire() {
val now: Long = System.nanoTime()
val until = mutex.withLock {
max(next, now).also {
next = it + delayNanos
}
}
if (until != now) {
delay((until - now) / 1_000_000)
}
}
}
Run Code Online (Sandbox Code Playgroud)
它还伴随着其他的权衡。
如果您想要一个 IntervalLimiter 允许每 Y 秒发出 X 个请求,然后抛出异常,Resilience4J 中有 RateLimiter 或者如果您想要功能更齐全的东西,我正在开发一个 PR 来创建 RateLimiter 和协程核心项目中的 IntervalLimiter。
归档时间: |
|
查看次数: |
534 次 |
最近记录: |