从mainThread()切换到io()时出现InterruptedIOException

tas*_*ula 5 concurrency android rx-java rx-android

我有一些代码首先必须运行AndroidSchedulers.mainThread(),然后必须执行HTTP请求,因此必须运行Schedulers.io(),并在UI 上处理结果,所以回到AndroidSchedulers.mainThread().

我收到InterruptedIOException从切换时AndroidSchedulers.mainThread()Scheulers.io().

这是一些代码:

Model model = getModel();

Completable.fromAction(
    new Action0() {
        public void call() {
            mSubject.onNext(model)
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread())
    .observeOn(Schedulers.io())
    .andThen(fetchFromServer())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(/* handle success and error */);
...
public <T> Single<T> fetchFromServer() {
    Request request = new Request(); // some request from server, not important
    return bodyFrom2(request);
}

public <T> Single<T> bodyFrom2(final Request<T> request) {
    return Single.defer(new Callable<Single<T>>() {
                @Override
                public Single<T> call() throws Exception {
                    try {
                        Response<T> response = request.execute();
                        if (response.error() != null)
                           return Single.error(response.error().getMessage());
                        else {
                            return Single.just(response.body());
                        }
                    } catch (IOException e) {
                        return Single.error(e);
                    }
                }
            });
}

public static <T> Single<T> bodyFrom1(final Request<T> request) {
    return Single.create(new Single.OnSubscribe<T>() {
        @Override
        public void call(SingleSubscriber<? super T> subscriber) {
            try {
                Response<T> response = request.execute();
                if (subscriber.isUnsubscribed())
                    return;

                if (response.error() != null)
                    subscriber.onError(response.error().getMessage());
                else {
                    subscriber.onSuccess(response.body());
                }
            } catch (IOException e) {
                if (subscriber.isUnsubscribed())
                    return;

                subscriber.onError(e);
            }
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

抛出异常bodyFrom()(1或2),at request.execute().我用了bodyFrom1(),但是我在SO上发现了这个问题,并考虑尝试使用第二个问题.无论如何,我收到了例外.

试图找出问题的原因和地点,我试过这个:

Completable.complete()
    .subscribeOn(AndroidSchedulers.mainThread())
    .observeOn(Schedulers.io())
    .andThen(fetchFromServer())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(/* handle success and error */);
Run Code Online (Sandbox Code Playgroud)

仍然抛出InterruptedIOException,这:

Completable.complete()
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .andThen(fetchFromServer())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(/* handle success and error */);
Run Code Online (Sandbox Code Playgroud)

哪个工作正常.

编辑:

  • 它似乎工作,如果我使用ObservableSingle代替Completable.
  • 在RxAndroid的Github上添加了一个问题.