如何正确处理RxJava(Android)中的onError?

a_d*_*zik 27 android reactive-programming rx-java

我正在获取设备上已安装应用的列表.这是一个代价高昂的操作,所以我正在使用Rx:

    Observable<List> observable = Observable.create(subscriber -> {
        List result = getUserApps();

        subscriber.onNext(result);
        subscriber.onError(new Throwable());
        subscriber.onCompleted();
    });

    observable
            .map(s -> {
                ArrayList<String> list = new ArrayList<>();
                ArrayList<Application> applist = new ArrayList<>();
                for (Application p : (ArrayList<Application>) s) {
                    list.add(p.getAppName());
                    applist.add(p);
                }
                return applist;
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(throwable -> L.e(TAG, "Throwable " + throwable.getMessage()))
            .subscribe(s -> createListView(s, view));
Run Code Online (Sandbox Code Playgroud)

但是,我的问题是处理错误.通常,用户启动此屏幕,等待加载应用程序,选择最佳选择并转到下一页.但是,当用户快速更改UI时 - 应用程序与NullPointer崩溃.

好的,所以我实现了这个onError.但是它仍然无效,并且在上面的用例中它会抛出这个:

    04-15 18:12:42.530  22388-22388/pl.digitalvirgo.safemob E/AndroidRuntime? FATAL EXCEPTION: main
        java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
                at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:52)
                at android.os.Handler.handleCallback(Handler.java:730)
                at android.os.Handler.dispatchMessage(Handler.java:92)
                at android.os.Looper.loop(Looper.java:176)
                at android.app.ActivityThread.main(ActivityThread.java:5419)
                at java.lang.reflect.Method.invokeNative(Native Method)
                at java.lang.reflect.Method.invoke(Method.java:525)
                at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1046)
                at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:862)
                at dalvik.system.NativeStart.main(Native Method)
         Caused by: rx.exceptions.OnErrorNotImplementedException
                at rx.Observable$31.onError(Observable.java:7134)
                at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:154)
                at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
                at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
                at rx.internal.operators.NotificationLite.accept(NotificationLite.java:147)
                at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:177)
                at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.access$000(OperatorObserveOn.java:65)
                at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:153)
                at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
                at android.os.Handler.handleCallback(Handler.java:730)
                at android.os.Handler.dispatchMessage(Handler.java:92)
                at android.os.Looper.loop(Looper.java:176)
                at android.app.ActivityThread.main(ActivityThread.java:5419)
                at java.lang.reflect.Method.invokeNative(Native Method)
                at java.lang.reflect.Method.invoke(Method.java:525)
                at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1046)
                at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:862)
                at dalvik.system.NativeStart.main(Native Method)
         Caused by: java.lang.Throwable
                at pl.digitalvirgo.safemob.fragments.wizard.ApplicationsFragment.lambda$getAppList$25(ApplicationsFragment.java:267)
                at pl.digitalvirgo.safemob.fragments.wizard.ApplicationsFragment.access$lambda$2(ApplicationsFragment.java)
                at pl.digitalvirgo.safemob.fragments.wizard.ApplicationsFragment$$Lambda$3.call(Unknown Source)
                at rx.Observable$1.call(Observable.java:145)
                at rx.Observable$1.call(Observable.java:137)
                at rx.Observable.unsafeSubscribe(Observable.java:7304)
                at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
                at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390)
                at java.util.concurrent.FutureTask.run(FutureTask.java:234)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
                at java.lang.Thread.run(Thread.java:841)
Run Code Online (Sandbox Code Playgroud)

我该如何妥善处理这个问题?

Luk*_*yga 21

.doOnError()是一个运营商,并不是这样的一部分Subscriber.

因此,有一个.doOnError()不算作实施onError().

关于其中一条评论中的问题,当然可以使用lambdas.

在这种情况下,只需更换

.doOnError(throwable -> L.e(TAG, "Throwable " + throwable.getMessage()))
.subscribe(s -> createListView(s, view))
Run Code Online (Sandbox Code Playgroud)

.subscribe(s -> createListView(s, view),
    throwable -> L.e(TAG, "Throwable " + throwable.getMessage()))
Run Code Online (Sandbox Code Playgroud)


inm*_*yth 12

我的看法是:你可能正在使用Action1

.subscribe(s -> createListView(s, view));
Run Code Online (Sandbox Code Playgroud)

您需要将其替换为具有抽象方法的Subscriber或Observer onError.将调用此方法subscriber.onError(new Throwable());

EDIT:我就是这样做的.仔细观察后,我认为代码中的主要问题是subscriber.onError即使没有错误也可以调用的早期部分.您可能也不需要map,因为您在技术上按原样传递数据而不进行操作.但我留下它以防以后需要它.

     Observable.create(new Observable.OnSubscribe<Application>() {
        @Override
        public void call(Subscriber<? super Application> subscriber) {
            List result = getUserApps();
            if (result != null){
                for (Application app : result){
                     subscriber.onNext(app);
                }
                subscriber.onComplete();
            }else{
                subscriber.onError(new IOException("no permission / no internet / etc"));
               //or if this is a try catch event you can pass the exception
            }     
        }
     })
    .subscribeOn(Schedulers.io())//the thread *observer* runs in
    .observeOn(AndroidSchedulers.mainThread())//the thread *subscriber* runs in
    .map(new Func1<Application, String>() {

        // Mapping methods are where data are manipulated. 
        // You can simply skip this and 
        //do the same thing in Subscriber implementation
        @Override
        public String call(Application application) {
            return application.getName();
        }
    }).subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
           Toast.makeText(context, "completed", Toast.LENGTH_SHORT).show();
           //because subscriber runs in main UI thread it's ok to do UI stuff
           //raise Toast, play sound, etc
        }

        @Override
        public void onError(Throwable e) {
           Log.e("getAppsError", e.getMessage());
           //raise Toast, play sound, etc
        }

        @Override
        public void onNext(String s) {
            listAdapter.add(s);
        }
    });
Run Code Online (Sandbox Code Playgroud)


Tob*_*iug 6

这是新手的响应(因为我是javarx的新手,并最终解决了此问题):

这是您的实现:

    Observable.create(new Observable.OnSubscribe<RegionItem>() {
                @Override
                public void call(Subscriber<? super RegionItem> subscriber) {
                    subscriber.onError(new Exception("TADA !"));
                }
            })
            .doOnNext(actionNext)
            .doOnError(actionError)
            .doOnCompleted(actionCompleted)
            .subscribe();
Run Code Online (Sandbox Code Playgroud)

在此先前的实现中,当我订阅时,我触发错误流...,并导致应用程序崩溃。

问题是您必须管理subscribe()调用中的错误。“ doOnError(...)”只是一种帮助程序,它可以克隆错误并为您提供一个在发生错误后执行某些操作的新位置。但是它不能处理错误。

因此,您必须使用以下命令更改代码:

    Observable.create(new Observable.OnSubscribe<RegionItem>() {
                @Override
                public void call(Subscriber<? super RegionItem> subscriber) {
                    subscriber.onError(new Exception("TADA !"));
                }
            })
            .subscribe(actionNext, actionError, actionCompleted);
Run Code Online (Sandbox Code Playgroud)

不确定真正的解释,但这是我解决的方法。希望它会有所帮助。