我在我的Android应用程序中使用rxjava来异步处理网络请求.现在我想在一段时间过后才重试失败的网络请求.
有没有办法在Observable上使用retry()但只能在一定延迟后重试?
有没有办法让Observable知道当前正在重试(而不是第一次尝试)?
我看了一下debounce()/ throttleWithTimeout(),但他们似乎做了不同的事情.
编辑:
我想我找到了一种方法,但是我会对确认这是正确的方法或其他更好的方法感兴趣.
我正在做的是:在我的Observable.OnSubscribe的call()方法中,在我调用Subscribers onError()方法之前,我只是让Thread睡眠所需的时间.所以,要每1000毫秒重试一次,我会这样做:
@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
try {
Log.d(TAG, "trying to load all products with pid: " + pid);
subscriber.onNext(productClient.getProductNodesForParentId(pid));
subscriber.onCompleted();
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e.printStackTrace();
}
subscriber.onError(e);
}
}
Run Code Online (Sandbox Code Playgroud)
由于此方法无论如何都在IO线程上运行,因此它不会阻止UI.我能看到的唯一问题是,即使是第一个错误也会报告延迟,所以即使没有重试(),也会出现延迟.如果延迟没有在错误之后应用,而是在重试之前(但不是在第一次尝试之前,显然),我会更喜欢它.
kjo*_*nes 165
您可以使用retryWhen()运算符将重试逻辑添加到任何Observable.
以下类包含重试逻辑:
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Function<Throwable, Observable<?>>() {
@Override
public Observable<?> apply(final Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
public class RetryWithDelay implements
Func1<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
用法:
// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
.retryWhen(new RetryWithDelay(3, 2000));
Run Code Online (Sandbox Code Playgroud)
McX*_*McX 16
受Paul的回答启发,如果您不关心Abhijit SarkarretryWhen所述的问题,使用rxJava2 unconditionnaly延迟重新订阅的最简单方法是:
source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))
Run Code Online (Sandbox Code Playgroud)
Eru*_*aro 11
此示例适用于jxjava 2.2.2:
立即重试:
Single.just(somePaylodData)
.map(data -> someConnection.send(data))
.retry(5)
.doOnSuccess(status -> log.info("Yay! {}", status);
Run Code Online (Sandbox Code Playgroud)
延迟重试:
Single.just(somePaylodData)
.map(data -> someConnection.send(data))
.retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
.doOnSuccess(status -> log.info("Yay! {}", status)
.doOnError((Throwable error)
-> log.error("I tried five times with a 300ms break"
+ " delay in between. But it was in vain."));
Run Code Online (Sandbox Code Playgroud)
如果someConnection.send()失败,则源单失败。当发生这种情况时,retryWhen内部的可观察到的失败将发出错误。我们将发射延迟300毫秒,然后将其发送回以发出重试信号。take(5)保证我们的可观察信号在收到五个错误后将终止。retryWhen看到终止并且在第五次失败后不重试。
这是基于本克里斯滕森的片段,我看到,一个解决方案RetryWhen实例,并RetryWhenTestsConditional(我不得不改变n.getThrowable()来n为它工作).我使用evant/gradle-retrolambda使lambda符号在Android上运行,但你不必使用lambdas(尽管强烈推荐).对于延迟,我实现了指数后退,但你可以插入你想要的任何退避逻辑.为了完整性,我添加了subscribeOn和observeOn运算符.我使用ReactiveX/RxAndroid的AndroidSchedulers.mainThread().
int ATTEMPT_COUNT = 10;
public class Tuple<X, Y> {
public final X x;
public final Y y;
public Tuple(X x, Y y) {
this.x = x;
this.y = y;
}
}
observable
.subscribeOn(Schedulers.io())
.retryWhen(
attempts -> {
return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
.flatMap(
ni -> {
if (ni.y > ATTEMPT_COUNT)
return Observable.error(ni.x);
return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
});
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)
小智 8
而不是使用MyRequestObservable.retry我使用一个包装函数retryObservable(MyRequestObservable,retrycount,seconds)返回一个新的Observable来处理延迟的间接,所以我可以做
retryObservable(restApi.getObservableStuff(), 3, 30)
.subscribe(new Action1<BonusIndividualList>(){
@Override
public void call(BonusIndividualList arg0)
{
//success!
}
},
new Action1<Throwable>(){
@Override
public void call(Throwable arg0) {
// failed after the 3 retries !
}});
// wrapper code
private static <T> Observable<T> retryObservable(
final Observable<T> requestObservable, final int nbRetry,
final long seconds) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
requestObservable.subscribe(new Action1<T>() {
@Override
public void call(T arg0) {
subscriber.onNext(arg0);
subscriber.onCompleted();
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
if (nbRetry > 0) {
Observable.just(requestObservable)
.delay(seconds, TimeUnit.SECONDS)
.observeOn(mainThread())
.subscribe(new Action1<Observable<T>>(){
@Override
public void call(Observable<T> observable){
retryObservable(observable,
nbRetry - 1, seconds)
.subscribe(subscriber);
}
});
} else {
// still fail after retries
subscriber.onError(error);
}
}
});
}
});
}
Run Code Online (Sandbox Code Playgroud)
retryWhen是一个复杂的,甚至可能是错误的操作符。官方文档和此处至少有一个答案使用range运算符,如果不进行重试,该运算符将失败。查看我与 ReactiveX 成员 David Karnok 的讨论。
我通过更改和添加一个类flatMap来改进 kjones 的答案。不保留发射顺序,这对于回退延迟很重要。的,顾名思义,让用户从产生重试延迟的各种模式,包括选择回退。该代码可在我的GitHub 上获得,并带有以下测试用例:concatMapRetryDelayStrategyflatMapconcatMapRetryDelayStrategy
见setRandomJokes方法。
基于kjones 的回答,这里是 Kotlin 版本的 RxJava 2.x 重试延迟作为扩展。替换Observable为Flowable.
fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
var retryCount = 0
return retryWhen { thObservable ->
thObservable.flatMap { throwable ->
if (++retryCount < maxRetries) {
Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
} else {
Observable.error(throwable)
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
然后在可观察对象上使用它 observable.retryWithDelay(3, 1000)
小智 5
与kjones的答案相同,但更新到最新版本 对于RxJava 2.x版本:( 'io.reactivex.rxjava2:rxjava:2.1.3')
public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {
private final int maxRetries;
private final long retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Flowable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Flowable.error(throwable);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
用法:
// 将重试逻辑添加到现有的可观察量中。// 最多重试 3 次,延迟 2 秒。
observable
.retryWhen(new RetryWithDelay(3, 2000));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
38287 次 |
| 最近记录: |