Sel*_*bor 3 java java-8 rx-java exponential-backoff
我有一个 API,它需要Observable
触发一个事件。
我想返回一个,如果检测到互联网连接,则每秒Observable
发出一个值,如果没有连接,则延迟时间。defaultDelay
numberOfFailedAttempts^2
我尝试了很多不同的风格,我遇到的最大问题是retryWhen's
observable 只评估一次:
Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay, TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();
Run Code Online (Sandbox Code Playgroud)
有什么办法可以做我想做的事情吗?我发现了一个相关的问题(现在无法找到它搜索),但所采取的方法似乎不适用于动态值。
您的代码中有两个错误:
interval
您最好使用类似just
, 或fromCallable
像我在下面的示例中所做的那样。repeatWhen
的内部函数中,您需要返回新的延迟可观察源,因此observable.delay()
您必须返回Observable.timer()
。工作代码:
public void testRepeat() throws InterruptedException {
logger.info("test start");
int DEFAULT_DELAY = 100; // ms
int ADDITIONAL_DELAY = 100; // ms
AtomicInteger generator = new AtomicInteger(0);
AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
.repeatWhen(counts -> {
AtomicInteger retryCounter = new AtomicInteger(0);
return counts.flatMap(c -> {
int retry = 0;
if (connectionAlive.get()) {
retryCounter.set(0); // reset counter
} else {
retry = retryCounter.incrementAndGet();
}
int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
});
})
.subscribe(v -> logger.info("got {}", v));
Thread.sleep(220);
logger.info("connection dropped");
connectionAlive.set(false);
Thread.sleep(2000);
logger.info("connection is back alive");
connectionAlive.set(true);
Thread.sleep(2000);
subscription.dispose();
logger.info("test complete");
}
Run Code Online (Sandbox Code Playgroud)
repeatWhen
请参阅此处的详细文章。