RxJava用新的observable无限重试

Hoo*_*oob 5 java control-flow event-bus vert.x rx-java

我正在使用Rx-ified API for vertx,这个问题必须做一个潜在的无限重试 - 直到成功循环我想实现但是有困难.我是RxJava的新手.

这是我想做的事情:

  1. 使用vertx消息总线向另一个vertx组件发送请求
  2. 只要我等待响应超时,请重新发出请求
  3. 一旦我对请求做出响应,检查结果,如果没有可用的,请等待一段时间,然后在步骤1重新开始

第一个问题

我遇到的第一个问题是如何完成步骤2).

如果您熟悉vert.x Rx api,这就是在上面的步骤1)中发出请求的意思:

vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject );
Run Code Online (Sandbox Code Playgroud)

上面的代码返回一个Observable实例,该实例将发出响应或错误(例如,如果有超时).Observable将永远不会再发出任何东西(或者每次订阅时它总会发出完全相同的东西,我不知道哪个).

RxJava重试运算符似乎不起作用

vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
     .retry()
Run Code Online (Sandbox Code Playgroud)

我认为为了发出重试,我可以使用RxJava的retry()运算符,我尝试过,但由于源可观察的性质,这样做完全没有任何有用的效果.没有新的请求消息被发送,因为唯一被"重试"的是订阅原始源,它永远不会发出任何不同的东西.

RxJava重试当运算符似乎不起作用

vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
     .retryWhen( error -> {
        return _vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
      })
Run Code Online (Sandbox Code Playgroud)

所以我想我可以使用RxJava的retryWhen()运算符,它允许我在根observable发出错误时发出第二个observable.我想,第二个可观测量可能与上面第一步产生初始观察者的代码相同.

但是,retryWhen()运算符(请参阅文档)不允许第二个observable发出错误而不会因错误而结束订阅.

所以,我无法弄清楚如何在这个链的第一部分内建立一个潜在的无限重试循环.

我必须在这里遗漏一些东西,但我无法确定它是什么.

第二个问题

vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
     // imagine that retryWhen() accomplishes an infinite retry
     .retryWhen( error -> {
        return _vertx.eventBus().<JsonObject>sendObservable( ... )
      })
     .flatMap( response -> {
        // inspect response, if it has usable data,
        // return that data as an observable
        return Observable.from(response.data());

        // if the response has no usable data,
        // wait for some time, then start the whole process
        // all over again
        return Observable.timer(timeToWait).<WHAT GOES HERE?>;
     })
Run Code Online (Sandbox Code Playgroud)

第二个问题是如何实现第3步.这在我看来就像第一个问题,只是更难理解,因为我不需要重试直接的源可观察,我需要等待一点,然后重新开始在步骤1).

无论我创建什么Observable似乎都需要链中的所有元素导致这一点,这似乎是一种应该避免的递归.

在这一点上,我们非常欢迎任何帮助或建议.

Hoo*_*oob 4

非常感谢RxJava Google Group上的 Ben Christensen指出 defer() 运算符,它将在每个订阅上生成一个新的 Observable。然后可以将其与标准 retry() 运算符组合以获得无限重试。

因此,我的问题中第一个问题的最简单的解决方案如下所示:

Observable.defer( () -> vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject ) )
.retry()
Run Code Online (Sandbox Code Playgroud)

...如果需要指数退避,您可以在提供给 defer() 运算符的工厂方法中添加带有适当参数的 Observable.timer() 。

我仍在研究第二个问题。