RxJava:重试时有重试限制

Ree*_*ezy 10 couchbase rx-java

我是ReactiveX和反应式编程的新手.我需要为Couchbase CAS操作实现重试机制,但Couchbase网站上的示例显示重试时,它似乎无限期地重试.我需要在那里的某个地方有一个重试限制和重试计数.

简单的retry()可以工作,因为它接受retryLimit,但我不希望它只在CASMismatchException上重试每个异常.

有任何想法吗?我正在使用RxJava库.

das*_*chl 9

除了Simon Basle所说的,这是一个带线性退避的快速版本:

.retryWhen(notification ->
    notification
    .zipWith(Observable.range(1, 5), Tuple::create)
    .flatMap(att ->
            att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(att.value2(), TimeUnit.SECONDS)
    )
)
Run Code Online (Sandbox Code Playgroud)

请注意,这里的"att"是一个元组,它由throwable和重试次数组成,因此您可以非常具体地实现基于这两个参数的返回逻辑.

如果你想学习更多,你可以看看我正在写的弹性文档:https://gist.github.com/daschl/db9fcc9d2b932115b679#retry-with-delay


Sim*_*slé 6

retryWhen 显然比简单的重试复杂一点,但这里是它的要点:

  • 你将一个notificationHandler函数传递给 retryWhen ,它接受一个Observable<Throwable>并输出一个Observable<?>
  • 这个返回的 Observable 的发射决定什么时候重试应该发生或停止
  • 因此,对于原始流中出现的每个异常,如果处理程序发出 1 个项目,则将有 1 次重试。如果它发出 2 个项目,就会有 2 个......
  • 一旦处理程序的流发出错误,重试就会中止。

使用它,您可以:

  • 仅在CasMismatchExceptions以下Observable.error(t)情况下工作:让您的函数在其他情况下返回一个
  • 仅重试特定次数:对于每个异常,从Observable.range代表最大重试次数的flatMap返回一个Observable.timer,如果您需要增加延迟,请使用重试 #返回。

您的用例与此处的RxJava 文档中的用例非常接近