如何忽略错误并继续无限流?

Zie*_*iem 39 android rx-java rx-android

我想知道如何忽略异常并继续无限流(在我的情况下,位置流)?

我正在获取当前用户位置(使用Android-ReactiveLocation),然后将它们发送到我的API(使用Retrofit).

在我的情况下,当在网络调用(例如超时)期间发生异常时,调用onError方法并且流自行停止.怎么避免呢?

活动:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}
Run Code Online (Sandbox Code Playgroud)

RestService:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}
Run Code Online (Sandbox Code Playgroud)

tom*_*ozb 54

您可能希望使用其中一个错误处理运算符.

  • onErrorResumeNext(?) - 指示Observable在遇到错误时发出一系列项目
  • onErrorReturn(?) - 指示Observable在遇到错误时发出特定项目
  • onExceptionResumeNext(?) - 指示Observable在遇到异常后继续发出项目(但不是另一种可抛出的项目)
  • retry(?) - 如果源Observable发出错误,请重新订阅它,希望它能完成而不会出错
  • retryWhen(?) - 如果源Observable发出错误,请将该错误传递给另一个Observable以确定是否重新订阅源

特别是具有retryonExceptionResumeNext看你的情况看好.

  • 不要在`flatMap`之后添加它,而是在`flatMap`中添加它. (21认同)
  • 当我在`flatMap(locations - > mRestService.postLocations(locations))之后添加`onExceptionResumeNext(Observable.empty())`时``onCompleted`被调用并且流结束. (2认同)

dwu*_*sen 16

mRestService.postLocations(locations)发出一个项目,然后完成.如果发生错误,则它会发出错误,从而完成流.

当您在a中调用此方法时flatMap,错误将继续到您的"主"流,然后您的流停止.

您可以做的是将您的错误转换为另一个项目(如下所述:https://stackoverflow.com/a/28971140/476690),但不在您的主流上(因为我认为您已经尝试过),但是在mRestService.postLocations(locations).

这样,此调用将发出错误,该错误将转换为项目/另一个可观察对象,然后完成.(不打电话onError).

在消费者视图中,mRestService.postLocations(locations)将发出一个项目,然后完成,就像一切都成功一样.

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
        .buffer(50)
        .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();
Run Code Online (Sandbox Code Playgroud)

  • 不幸的是,它不是忽略,而是发出空列表。最好不要出错 (4认同)

Alb*_*lvo 10

如果您只想忽略内的错误flatMap 而不返回任何元素,请执行以下操作:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty())
);
Run Code Online (Sandbox Code Playgroud)

  • 这实际上会完成流。不是吗? (2认同)

All*_*ing 7

只是粘贴来自@MikeN的答案的链接信息,它就会丢失:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}
Run Code Online (Sandbox Code Playgroud)

并使用它靠近可观察的来源,因为其他运营商可能会在此之前急切地取消订阅.

Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe();
Run Code Online (Sandbox Code Playgroud)

但请注意,这会抑制来自源的错误传递.如果在抛出异常后链中的任何onNext,则仍可能取消订阅源.