RxJava 中的指数退避

Sel*_*bor 3 java java-8 rx-java exponential-backoff

我有一个 API,它需要Observable触发一个事件。

我想返回一个,如果检测到互联网连接,则每秒Observable发出一个值,如果没有连接,则延迟时间。defaultDelaynumberOfFailedAttempts^2

我尝试了很多不同的风格,我遇到的最大问题是retryWhen'sobservable 只评估一次:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay, TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();
Run Code Online (Sandbox Code Playgroud)

有什么办法可以做我想做的事情吗?我发现了一个相关的问题(现在无法找到它搜索),但所采取的方法似乎不适用于动态值。

Yar*_*hiy 6

您的代码中有两个错误:

  1. 为了重复某些可观察的序列,该序列必须是有限的。即,interval您最好使用类似just, 或fromCallable像我在下面的示例中所做的那样。
  2. repeatWhen的内部函数中,您需要返回新的延迟可观察源,因此observable.delay()您必须返回Observable.timer()

工作代码:

public void testRepeat() throws InterruptedException {
    logger.info("test start");

    int DEFAULT_DELAY = 100; // ms
    int ADDITIONAL_DELAY = 100; // ms
    AtomicInteger generator = new AtomicInteger(0);
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
            .repeatWhen(counts -> {
                AtomicInteger retryCounter = new AtomicInteger(0);
                return counts.flatMap(c -> {
                    int retry = 0;
                    if (connectionAlive.get()) {
                        retryCounter.set(0); // reset counter
                    } else {
                        retry = retryCounter.incrementAndGet();
                    }
                    int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
                    logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
                    return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
                });
            })
            .subscribe(v -> logger.info("got {}", v));

    Thread.sleep(220);
    logger.info("connection dropped");
    connectionAlive.set(false);
    Thread.sleep(2000);
    logger.info("connection is back alive");
    connectionAlive.set(true);
    Thread.sleep(2000);
    subscription.dispose();
    logger.info("test complete");
}
Run Code Online (Sandbox Code Playgroud)

repeatWhen 请参阅此处的详细文章。