如何使用RxJS重试超时持续时间?

Bra*_*don 9 reactive-programming rxjs rxjs5 typescript2.0 angular

我正在使用最新的Angular和Typescript以及RxJS 5.

Angular目前使RxJS成为必需品.我已经使用C#超过10年了,我非常习惯Linq/Lambdas /流利的语法,我认为它构成了Reactive的基础.

我想在重试时使用越来越大的超时值进行Http get调用,但是我遇到了一个问题,看看如何做到并仍然保持管道中的所有内容(不使用外部状态).

我知道我可以这样做,但它只会使用相同的超时值重试.

myHttpObservable.timeout(1000).retry(2);
Run Code Online (Sandbox Code Playgroud)

RxJS的文档在很多地方都很差,在这里询问它只是删除了我的问题,这很难过......所以我被迫查看源代码.

有没有办法以每次增加超时持续时间的方式重试,以保持状态在管道中?此外,我想在第一次尝试时保留一个初始超时.

我一开始尝试过与此类似的东西,但实现了令人困惑的重试当操作符并非真正用于我想要的东西时:

myHttpObservable.timeout(1000).retryWhen((theSubject: Observable<Error>) => {
 return  aNewMyObservableCreatedinHere.timeout(2000);  
});
Run Code Online (Sandbox Code Playgroud)

我知道我可以使用外部状态来实现这一点,但我基本上都在寻找一种优雅的解决方案,我认为,这是他们用反应式编程方式所驱动的.

Mar*_*ten 11

目前RxJs5最大的问题之一就是文档.它确实是碎片化的,与之前的版本不相上下.通过查看RxJs4的文档,您可以看到.retryWhen() 已经有一个构建指数退避的示例,可以轻松迁移到RxJs5:

Rx.Observable.throw(new Error('splut'))
  .retryWhen(attempts => Rx.Observable.range(1, 3)
    .zip(attempts, i => i)
    .mergeMap(i => {
      console.log("delay retry by " + i + " second(s)");
      return Rx.Observable.timer(i * 1000);
    })
  ).subscribe();
Run Code Online (Sandbox Code Playgroud)


Mar*_*ten 6

根据我之前回答的评论中的其他输入,我一直在与.expand()操作员进行试验来解决您的要求:

我想使用超时持续时间 = X 进行获取,如果超时,则使用 timeout = X+1 重试

以下代码段以 timeout = 500 开始,每次尝试超时都会增加尝试 * 500,直到达到maxAttempts或成功检索结果:

getWithExpandingTimeout('http://reaches-max-attempts', 3)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );

getWithExpandingTimeout('http://eventually-finishes-within-timeout', 10)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );



/*
retrieve the given url and keep trying with an expanding 
timeout until it succeeds or maxAttempts has been reached
*/
function getWithExpandingTimeout(url, maxAttempts) {
  return get(url, 1)
    .expand(res => {
      if(res.error) {
        const nextAttempt = 1 + res.attempt;
        if(nextAttempt > maxAttempts) {
          // too many retry attempts done
          return Rx.Observable.throw(new Error(`Max attempts reached to retrieve url ${url}: ${res.error.message}`));
        }
        
        // retry next attempt
        return get(url, nextAttempt);
      }
      return Rx.Observable.empty(); // done with retrying, result has been emitted
    })
    .filter(res => !res.error);
}

/*
 retrieve info from server with timeout based on attempt
 NOTE: does not errors the stream so expand() keeps working
*/
function get(url, attempt) {
  return Rx.Observable.of(`result for ${url} returned after #${attempt} attempts`)
    .do(() => console.log(`starting attempt #${attempt} to retrieve ${url}`))
    .delay(5 * 500)
    .timeout(attempt * 500)
    .catch(error => Rx.Observable.of({ attempt: attempt, error }));
}
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)

这是如何运作的

上游或.expand()操作员产生的每个值都被发送到下游并用作.expand()操作员的输入。这一直持续到没有进一步的值被发出。通过利用这种行为,.get()当发射包含一个.error值时,函数会随着尝试次数的增加而重新尝试。

.get()函数不会抛出错误,否则我们需要在 中捕获它,.expand()否则递归会意外中断。

当超过maxAttempts 时.expand()操作员会抛出一个错误,停止递归。当.error发射中不存在任何属性时,我们希望这是一个成功的结果并发射一个空的 Observable,停止递归。

注意:它用于.filter()删除基于.error属性的所有排放,因为 产生的所有值.expand()也会向下游排放,但这些.error值是内部状态。


Ale*_*hko 6

backoff-rxjsnpm软件包中有一个运算符来处理这种情况retryBackoff。我在blog.angularindepth.com上的文章中对此进行了描述,但总的来说是这样的:

source.pipe(
  retryWhen(errors => errors.pipe(
      concatMap((error, iteration) => 
          timer(Math.pow(2, iteration) * initialInterval, maxInterval))));
Run Code Online (Sandbox Code Playgroud)

这是可定制性更高的版本的来源