RxAndroid Observable在意外的线程上运行

mro*_*627 0 android reactive-programming rx-java rx-android

我正在尝试创建一个Observable这样的,它将在一个时间间隔内从网络加载一些数据,并在用户刷新页面时.这是我到目前为止的要点:

PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.flatMap(t -> {
    // network operations that eventually return a value
    // these operations are not observables themselves
    // they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    // update ui with data
}, error -> {
    // do something with error
});
Run Code Online (Sandbox Code Playgroud)

后来在刷新回调中我有:

refreshSubject.onNext(0L);
Run Code Online (Sandbox Code Playgroud)

它在间隔很好的情况下运行,但是当我刷新时,它会爆炸NetworkOnMainThreadException.我以为我用subscribeOn/ 处理了这个observeOn.我错过了什么?另外,为什么当Observer从间隔触发时这不会导致崩溃?

Ste*_*han 5

您必须将您更改subscribeOn(Schedulers.io())observeOn(Schedulers.io())并将其移动到flatMap上.原因是你的refreshSubject是一个PublishSubject,它是一个Observable和一个Observer.

因为onNext()在将结果传递给您的订阅之前,首先在实习生Observable中调用此PublishSubject.这也是当你只使用你的Observable时它工作的原因(以及interval默认情况下总是订阅计算线程的事实).

只需检查这两个片段的输出:

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.observeOn(Schedulers.io())
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
                // do something with error
            });
Run Code Online (Sandbox Code Playgroud)

VS

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
    // do something with error
});
Run Code Online (Sandbox Code Playgroud)