kar*_*ari 6 observable rxjs angular
我一直在尝试实现我自己的 RxJS 运算符,以便我可以将其“插入”到我的 Angular 的 HttpClient。我想让请求每 X 毫秒持续一次(轮询),但如果出现错误,我想使用某种增量策略重试请求。这是一个细分:
这是我到目前为止所拥有的:
function repeatWithBackoff<T>(delay: number, maxDelay = 60000) {
return (source: Observable<T>) =>
timer(0, delay).pipe(
concatMap(() => {
return source.pipe(
retryWhen((attempts) => {
return attempts.pipe(
concatMap((attempt, i) => {
const backoffDelay = Math.min(delay * Math.pow(2, i), maxDelay);
return timer(backoffDelay);
})
);
})
);
})
);
}
Run Code Online (Sandbox Code Playgroud)
我的使用方法如下:
httpClient.post(...)
.pipe(repeatWithBackoff(1000, 60000))
.subscribe((x) => console.log('Result', x));
Run Code Online (Sandbox Code Playgroud)
显然我的代码有点工作,但重试不能正常工作,我希望当请求失败时什么也不会发生(例如没有 console.log() ),但我可以看到打印到控制台的可观察值。另外,如果有人有一个聪明的想法来简化功能,我将不胜感激:)
现代 RxJS 通过使用接受非常强大的配置的内置retry和repeat运算符使这项任务变得简单。
import { pipe, retry, repeat, timer } from 'rxjs'
function pollWithBackoff<T> (delay: number, maxDelay: number): MonoTypeOperatorFunction<T> {
return pipe(
retry({
delay: (_error, i) => {const backoffDelay = Math.min(delay * Math.pow(2, i-1), maxDelay);
return timer(backoffDelay);}
}),
repeat({
delay
}),
)
}
Run Code Online (Sandbox Code Playgroud)
在 stackblitz 上查看它的实际应用。
创建复杂运算符的最佳方法是使用pipe函数。这使您能够像在.pipe方法中使用 RxJS 运算符一样编写它们。无需使用源可观察对象,在源可观察对象中您需要处理所有错误和完成并将它们发送到输出可观察对象。
首先,使用retry接受配置对象的运算符,您可以在其中提供自定义计时函数。从操作员开始非常重要retry,因为我们希望每次请求错误时都重新启动退避。注意:我不知道为什么,但似乎错误索引以i1 开头。
现在您有了“接受此请求并以指数退避重试”的逻辑。之后,添加repeat运算符,它再次接受带有delay. 这确保了前面的逻辑重复。因为我们已经处理了重试问题,所以repeat操作符的使用非常简单,并且仅在发出成功请求后才重复。