Ada*_*dam 5 javascript reactive-programming rxjs
我想使用向服务器发出一系列请求,但是服务器的硬速率限制为每秒10个请求。如果我尝试循环发出请求,那么它将达到速率限制,因为所有请求都将同时发生。
for(let i = 0; i < 20; i++) {
sendRequest();
}
Run Code Online (Sandbox Code Playgroud)
ReactiveX有很多用于修改可观察流的工具,但是我似乎找不到实现速率限制的工具。我尝试添加标准延迟,但请求仍同时触发,仅比之前晚100ms。
const queueRequest$ = new Rx.Subject<number>();
queueRequest$
.delay(100)
.subscribe(queueData => {
console.log(queueData);
});
const queueRequest = (id) => queueRequest$.next(id);
function fire20Requests() {
for (let i=0; i<20; i++) {
queueRequest(i);
}
}
fire20Requests();
setTimeout(fire20Requests, 1000);
setTimeout(fire20Requests, 5000);
Run Code Online (Sandbox Code Playgroud)
在debounceTime和throttleTime运营商都相似,我正在寻找为好,但是这是不是无损有损。我想保留我提出的每个请求,而不是丢弃先前的请求。
...
queueRequest$
.debounceTime(100)
.subscribe(queueData => {
sendRequest();
});
...
Run Code Online (Sandbox Code Playgroud)
如何使用ReactiveX和Observables向服务器发出这些请求而又不超过速率限制?
OP 的自我回答(以及链接的博客)中的实现总是会造成不太理想的延迟。
如果限速服务允许每秒 10 个请求,那么应该可以在 10 毫秒内发出 10 个请求,只要下一个请求在另外 990 毫秒内没有发出。
下面的实现应用了可变延迟以确保强制执行限制,并且延迟仅应用于会看到超出限制的请求。
function rateLimit(source, count, period) {
return source
.scan((records, value) => {
const now = Date.now();
const since = now - period;
// Keep a record of all values received within the last period.
records = records.filter((record) => record.until > since);
if (records.length >= count) {
// until is the time until which the value should be delayed.
const firstRecord = records[0];
const lastRecord = records[records.length - 1];
const until = firstRecord.until + (period * Math.floor(records.length / count));
// concatMap is used below to guarantee the values are emitted
// in the same order in which they are received, so the delays
// are cumulative. That means the actual delay is the difference
// between the until times.
records.push({
delay: (lastRecord.until < now) ?
(until - now) :
(until - lastRecord.until),
until,
value
});
} else {
records.push({
delay: 0,
until: now,
value
});
}
return records;
}, [])
.concatMap((records) => {
const lastRecord = records[records.length - 1];
const observable = Rx.Observable.of(lastRecord.value);
return lastRecord.delay ? observable.delay(lastRecord.delay) : observable;
});
}
const start = Date.now();
rateLimit(
Rx.Observable.range(1, 30),
10,
1000
).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1508 次 |
| 最近记录: |