我如何使用RxJS 5无损限制请求的等级

Ada*_*dam 5 javascript reactive-programming rxjs

我想使用向服务器发出一系列请求,但是服务器的硬速率限制为每秒10个请求。如果我尝试循环发出请求,那么它将达到速率限制,因为所有请求都将同时发生。

for(let i = 0; i < 20; i++) {
  sendRequest();
}
Run Code Online (Sandbox Code Playgroud)

ReactiveX有很多用于修改可观察流的工具,但是我似乎找不到实现速率限制的工具。我尝试添加标准延迟,但请求仍同时触发,仅比之前晚100ms。

const queueRequest$ = new Rx.Subject<number>();

queueRequest$
  .delay(100)
  .subscribe(queueData => {
    console.log(queueData);
  });

const queueRequest = (id) => queueRequest$.next(id);

function fire20Requests() {
  for (let i=0; i<20; i++) {
    queueRequest(i);
  }
}

fire20Requests();
setTimeout(fire20Requests, 1000);
setTimeout(fire20Requests, 5000);
Run Code Online (Sandbox Code Playgroud)

debounceTimethrottleTime运营商都相似,我正在寻找为好,但是这是不是无损有损。我想保留我提出的每个请求,而不是丢弃先前的请求。

...
queueRequest$
  .debounceTime(100)
  .subscribe(queueData => {
    sendRequest();
  });
...
Run Code Online (Sandbox Code Playgroud)

如何使用ReactiveX和Observables向服务器发出这些请求而又不超过速率限制?

car*_*ant 6

OP 的自我回答(以及链接的博客)中的实现总是会造成不太理想的延迟。

如果限速服务允许每秒 10 个请求,那么应该可以在 10 毫秒内发出 10 个请求,只要下一个请求在另外 990 毫秒内没有发出。

下面的实现应用了可变延迟以确保强制执行限制,并且延迟仅应用于会看到超出限制的请求。

function rateLimit(source, count, period) {

  return source
    .scan((records, value) => {

      const now = Date.now();
      const since = now - period;

      // Keep a record of all values received within the last period.

      records = records.filter((record) => record.until > since);
      if (records.length >= count) {

        // until is the time until which the value should be delayed.

        const firstRecord = records[0];
        const lastRecord = records[records.length - 1];
        const until = firstRecord.until + (period * Math.floor(records.length / count));

        // concatMap is used below to guarantee the values are emitted
        // in the same order in which they are received, so the delays
        // are cumulative. That means the actual delay is the difference
        // between the until times.

        records.push({
          delay: (lastRecord.until < now) ?
            (until - now) :
            (until - lastRecord.until),
          until,
          value
        });
      } else {
        records.push({
          delay: 0,
          until: now,
          value
        });
      }
      return records;

    }, [])
    .concatMap((records) => {

      const lastRecord = records[records.length - 1];
      const observable = Rx.Observable.of(lastRecord.value);
      return lastRecord.delay ? observable.delay(lastRecord.delay) : observable;
    });
}

const start = Date.now();
rateLimit(
  Rx.Observable.range(1, 30),
  10,
  1000
).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)