rxjava:我可以使用retry()但是有延迟吗?

dav*_*ola 86 rx-java

我在我的Android应用程序中使用rxjava来异步处理网络请求.现在我想在一段时间过后才重试失败的网络请求.

有没有办法在Observable上使用retry()但只能在一定延迟后重试?

有没有办法让Observable知道当前正在重试(而不是第一次尝试)?

我看了一下debounce()/ throttleWithTimeout(),但他们似乎做了不同的事情.

编辑:

我想我找到了一种方法,但是我会对确认这是正确的方法或其他更好的方法感兴趣.

我正在做的是:在我的Observable.OnSubscribe的call()方法中,在我调用Subscribers onError()方法之前,我只是让Thread睡眠所需的时间.所以,要每1000毫秒重试一次,我会这样做:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}
Run Code Online (Sandbox Code Playgroud)

由于此方法无论如何都在IO线程上运行,因此它不会阻止UI.我能看到的唯一问题是,即使是第一个错误也会报告延迟,所以即使没有重试(),也会出现延迟.如果延迟没有错误之后应用,而是重试之前(但不是在第一次尝试之前,显然),我会更喜欢它.

kjo*_*nes 165

您可以使用retryWhen()运算符将重试逻辑添加到任何Observable.

以下类包含重试逻辑:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}
Run Code Online (Sandbox Code Playgroud)

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}
Run Code Online (Sandbox Code Playgroud)

用法:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));
Run Code Online (Sandbox Code Playgroud)

  • @nima我有同样的问题,将`RetryWithDelay`更改为:http://pastebin.com/6SiZeKnC (3认同)
  • `错误:(73, 20) 错误:类型不兼容:RetryWithDelay 无法转换为 Func1&lt;? 超级可观察&lt;? 扩展 Throwable&gt;,? 扩展 Observable&lt;?&gt;&gt;` (2认同)
  • 看起来RxJava重试当我最初写这个时,操作符已经改变了.我会得到答案更新. (2认同)
  • 您应该更新此答案以符合RxJava 2 (2认同)
  • 可以在 [here](https://pastebin.com/Jj1kWHKa) 中找到**并非专门**针对 `Observable` 的实现(支持所有 RxJava 2.x 兼容的 observable,基于 `Publisher` 接口)。 (2认同)

McX*_*McX 16

Paul的回答启发,如果您不关心Abhijit SarkarretryWhen所述的问题,使用rxJava2 unconditionnaly延迟重新订阅的最简单方法是:

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))
Run Code Online (Sandbox Code Playgroud)

您可能希望在重试时看到更多样本和解释.何时重复.


Eru*_*aro 11

此示例适用于jxjava 2.2.2:

立即重试:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);
Run Code Online (Sandbox Code Playgroud)

延迟重试:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));
Run Code Online (Sandbox Code Playgroud)

如果someConnection.send()失败,则源单失败。当发生这种情况时,retryWhen内部的可观察到的失败将发出错误。我们将发射延迟300毫秒,然后将其发送回以发出重试信号。take(5)保证我们的可观察信号在收到五个错误后将终止。retryWhen看到终止并且在第五次失败后不重试。

  • 这种方法看起来很简单,但当重试次数用完时,它会吞掉错误而不是抛出错误。不过,在某些情况下它可能会派上用场。 (2认同)

dav*_*oze 9

这是基于本克里斯滕森的片段,我看到,一个解决方案RetryWhen实例,并RetryWhenTestsConditional(我不得不改变n.getThrowable()n为它工作).我使用evant/gradle-retrolambda使lambda符号在Android上运行,但你不必使用lambdas(尽管强烈推荐).对于延迟,我实现了指数后退,但你可以插入你想要的任何退避逻辑.为了完整性,我添加了subscribeOnobserveOn运算符.我使用ReactiveX/RxAndroidAndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)

  • 这看起来很优雅但是我没有使用lamba函数,如何在没有lambas的情况下编写?@ amitai-hoze (2认同)

小智 8

而不是使用MyRequestObservable.retry我使用一个包装函数retryObservable(MyRequestObservable,retrycount,seconds)返回一个新的Observable来处理延迟的间接,所以我可以做

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}
Run Code Online (Sandbox Code Playgroud)


Abh*_*kar 7

retryWhen是一个复杂的,甚至可能是错误的操作符。官方文档和此处至少有一个答案使用range运算符,如果不进行重试,该运算符将失败。查看我与 ReactiveX 成员 David Karnok 的讨论

我通过更改和添加一个类flatMap来改进 kjones 的答案。不保留发射顺序,这对于回退延迟很重要。的,顾名思义,让用户从产生重试延迟的各种模式,包括选择回退。该代码可在我的GitHub 上获得,并带有以下测试用例:concatMapRetryDelayStrategyflatMapconcatMapRetryDelayStrategy

  1. 第一次尝试成功(无重试)
  2. 重试 1 次后失败
  3. 尝试重试 3 次但在第 2 次成功,因此不会重试第 3 次
  4. 第三次重试成功

setRandomJokes方法。


Jul*_*ipt 7

基于kjones 的回答,这里是 Kotlin 版本的 RxJava 2.x 重试延迟作为扩展。替换ObservableFlowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后在可观察对象上使用它 observable.retryWithDelay(3, 1000)

  • @Papps是的,应该可以,只需注意“flatMap”必须使用“Flowable.timer”和“Flowable.error”,即使函数是“Single&lt;T&gt;.retryWithDelay”。 (3认同)

小智 5

与kjones的答案相同,但更新到最新版本 对于RxJava 2.x版本:( 'io.reactivex.rxjava2:rxjava:2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {

    private final int maxRetries;
    private final long retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
        return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Throwable throwable) throws Exception {
                if (++retryCount < maxRetries) {
                    // When this Observable calls onNext, the original
                    // Observable will be retried (i.e. re-subscribed).
                    return Flowable.timer(retryDelayMillis,
                            TimeUnit.MILLISECONDS);
                }

                // Max retries hit. Just pass the error along.
                return Flowable.error(throwable);
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

用法:

// 将重试逻辑添加到现有的可观察量中。// 最多重试 3 次,延迟 2 秒。

observable
    .retryWhen(new RetryWithDelay(3, 2000));
Run Code Online (Sandbox Code Playgroud)