订阅被清除时,处理Observable.fromCallable()中的异常

vki*_*ins 0 android rx-java rx-java2

我有一个情况,一个长时间运行的进程被包装在一个Observable.fromCallable().这个过程是一个OkHttp调用,如果终止,将抛出一个IOException.如果订阅了observable,则将一次性存储在a中CompositeDisposable,并按预期处理异常.但是,CompositeDisposable在某些情况下,我的代码将清除,在OkHttp没有错误处理的情况下触发线程终止,导致应用程序因未处理的异常而崩溃.这是这个问题的简单单元测试示例:

@Test
public void test(){
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    Observable<Object> o = Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            System.out.println("sleeping - this sleep will be interrupted when compositeDisposable gets cleared");
            Thread.sleep(3000);
            return null;
        }
    });
    compositeDisposable.add(o.subscribeOn(new IoScheduler()).subscribe());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    compositeDisposable.clear();
}
Run Code Online (Sandbox Code Playgroud)

有没有办法解决这个问题?

yos*_*riz 7

与RxJava1不同,RxJava2不会向订阅者提供此异常onError(),因为您调用cancel()取消订阅并且不再接收通知,因此现在默认情况下使用取消订阅代码发生的这种异常现在Thread.currentThread().getUncaughtExceptionHandler().uncaughtException().

您可以使用try catch包装这种可能在cancel中发生的异常,或者覆盖默认行为:

RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer()); 
Run Code Online (Sandbox Code Playgroud)

或任何其他你想要的处理.

您还应该阅读akarnokd在RxJava github上的完整解释.关于上述解决方案,
也请参考本讨论.