如何添加超时以检测 Observable 有一段时间没有发出

iam*_*ini 6 rx-java

我的用例如下:

我创建了一个 Observable 来访问远程服务器以获取一些数据。然而,由于服务器设计不善,可能永远没有响应,也没有例外。为了解决这个问题,我想要一些超时重试机制。

目前我尝试启动一个计时器来停止请求并在其中抛出异常,然后重试直到一定次数的尝试或我的真正超时。我尝试使用mergeWith运算符将请求与Observable.interval映射合并以使用生成错误Observable.error(),但是我无法在订阅者中捕获错误,并且看起来Observable.interval永无止境。

我应该如何处理 RXJAVA 中的任何操作员的这种情况?

我当前的代码如下所示:

Observable.fromEmitter(fetchNetwork->...)
.mergeWith(Observable.interval(...)
           .flatmap(n->(observable.error)))
.retryWhen(error->(checkTimeExceed))
.subscribe(handleResult)
Run Code Online (Sandbox Code Playgroud)

yos*_*riz 6

您可以将timeout()运算符与retryWhen()

Observable.fromEmitter(fetchNetwork->...)
     .timeout(TIMEOUT_VALUE, TimeUnit.SECONDS)
     .retryWhen(observable -> observable.flatMap(error -> {
                if (error instanceof TimeoutException) {
                    return Observable.just(new Object());
                } else {
                    return Observable.error(error);
                }
            }))
     .subscribe(handleResult)
Run Code Online (Sandbox Code Playgroud)

这将在 TIMEOUT_VALUE 秒后超时请求,并且只要请求超时就会重试,其他错误将像往常一样传播给订阅者onError()