RxJS 轮询+使用退避策略重试

kar*_*ari 6 observable rxjs angular

我一直在尝试实现我自己的 RxJS 运算符,以便我可以将其“插入”到我的 Angular 的 HttpClient。我想让请求每 X 毫秒持续一次(轮询),但如果出现错误,我想使用某种增量策略重试请求。这是一个细分:

  1. 每 X 毫秒一次又一次重复请求(轮询)
  2. 如果出现错误,请求应每 Y 毫秒重试一次,但如果请求再次失败,则毫秒数应加倍(例如,第一次失败 1000 毫秒,第二次 2000 毫秒,第三次 4000 毫秒),直到达到限制,例如1 分钟,则重试次数不会再次增加,所有后续重试都应等待 1 分钟。
  3. 重试成功后,重试毫秒计数器应重新启动,以便下次从Y毫秒开始重试。
  4. 当执行一个无论出于何种原因需要花费大量时间来响应的请求时,我们需要确保没有其他请求被触发或取消(我可以在这里看到 concatMap 的可能用途!)

这是我到目前为止所拥有的:

function repeatWithBackoff<T>(delay: number, maxDelay = 60000) {
  return (source: Observable<T>) =>
    timer(0, delay).pipe(
      concatMap(() => {
        return source.pipe(
          retryWhen((attempts) => {
            return attempts.pipe(
              concatMap((attempt, i) => {
                const backoffDelay = Math.min(delay * Math.pow(2, i), maxDelay);
                return timer(backoffDelay);
              })
            );
          })
        );
      })
    );
}
Run Code Online (Sandbox Code Playgroud)

我的使用方法如下:

httpClient.post(...)
  .pipe(repeatWithBackoff(1000, 60000))
  .subscribe((x) => console.log('Result', x));
Run Code Online (Sandbox Code Playgroud)

显然我的代码有点工作,但重试不能正常工作,我希望当请求失败时什么也不会发生(例如没有 console.log() ),但我可以看到打印到控制台的可观察值。另外,如果有人有一个聪明的想法来简化功能,我将不胜感激:)

kve*_*tis 2

现代 RxJS 通过使用接受非常强大的配置的内置retryrepeat运算符使这项任务变得简单。

import { pipe, retry, repeat, timer } from 'rxjs'

function pollWithBackoff<T> (delay: number, maxDelay: number): MonoTypeOperatorFunction<T> {
  return pipe(
    retry({
      delay: (_error, i) => {const backoffDelay = Math.min(delay * Math.pow(2, i-1), maxDelay);
      return timer(backoffDelay);}
    }),
    repeat({
      delay
    }),
  )
}
Run Code Online (Sandbox Code Playgroud)

在 stackblitz 上查看它的实际应用。

创建复杂运算符的最佳方法是使用pipe函数。这使您能够像在.pipe方法中使用 RxJS 运算符一样编写它们。无需使用源可观察对象,在源可观察对象中您需要处理所有错误和完成并将它们发送到输出可观察对象。

首先,使用retry接受配置对象的运算符,您可以在其中提供自定义计时函数。从操作员开始非常重要retry,因为我们希望每次请求错误时都重新启动退避。注意:我不知道为什么,但似乎错误索引以i1 开头。

现在您有了“接受此请求并以指数退避重试”的逻辑。之后,添加repeat运算符,它再次接受带有delay. 这确保了前面的逻辑重复。因为我们已经处理了重试问题,所以repeat操作符的使用非常简单,并且仅在发出成功请求后才重复。

有关配置对象的更多信息,请参阅重试文档、重复文档。