Schedulers.io()没有返回主线程

pab*_*dez 4 multithreading android parse-platform rx-java rx-android

我正在使用RxParse来解析查询的异步加载但是当我使用subscribeOn(Schedulers.io())订阅我的observable时,我的onCompleted方法永远不会在主线程上调用.而不是这个,我的onCompleted方法在工作线程池内调用.如果我使用observeOn(AndroidSchedulers.mainThread),一切都会工作,但我的onNextMethod也将在主线程上调用,我不想要它.

我的代码有问题吗?

我的代码有什么问题吗?

ParseObservable.find(myQuery)
    .map(myMapFunc())
    .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
    .subscribe(
        new Subscriber<MyObj>() {
           @Override
            public void onError(Throwable e) {
                Log.e("error","error",e);
            }

            @Override
            public void onNext(T t) {
                // ... worker thread (but here is ok)
            }

            public void onCompleted() {
                // ... worker thread again instead of mainThread
            }
        }
    )
);
Run Code Online (Sandbox Code Playgroud)

Tan*_*.7x 9

首先你要明白之间的差别subscribeOn()observeOn().这两个完全不同的运算符会影响Rx链的不同部分.

subscribeOn()指定Observable将在哪里工作.这不会影响在那里onNext(),onError()onComplete()执行.

observeOn()指定回调(例如onNext())的执行位置.它不会影响Observable的工作位置.

所有回调都将在同一个线程上发生.您不能指定某个回调在一个线程上发生,而某些回调在另一个线程上发生,通过任何RxJava API.如果这是您想要的行为,您将不得不在回调中自己实现它.


Dei*_*zan 6

不幸的是,订阅在所有方法的同一个线程中(onNext,onErroronCompleted

但你可以在方法Schedulers.io()内部和内部观察onNext(T t),创建一个新的Observable,听听MainThread如下:

ParseObservable.find(myQuery)
    .map(myMapFunc())
    .subscribeOn(Schedulers.io())
    .subscribe(
        new Subscriber<MyObj>() {
           @Override
            public void onError(Throwable e) {
                Log.e("error","error",e);
            }

            @Override
            public void onNext(T t) {
                Observable.just(t)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe((t) -> {
                         // do something in MainThread
                    })
            }

            public void onCompleted() {
                // ... worker thread again instead of mainThread
            }
        }
    )
);
Run Code Online (Sandbox Code Playgroud)

我希望它有所帮助!