如何在后台线程上执行flatMap

Igo*_*pov 11 java multithreading android reactive-programming rx-java

我正在使用Retrofit和RxJava来执行一些后台任务.代码如下所示:

public class MyLoader{  
  public Observable<MyData> getMyData(){
      return setupHelper().flatMap(new Func1<MyHelper, Observable<MyData>>() {
              @Override
              public Observable<MyData> call(MyHelper myHelper) {
                  return queryData(myHelper);
              }
      });
  }

  private Observable<MyData> queryData(MyHelper myHelper){
      ...
  }

  private Observable<MyHelper> setupHelper(){
     return Observable.create(new Observable.OnSubscribe<MyHelper>() {
          @Override
          public void call(final Subscriber<? super MyHelper> subscriber) {
              try{
                MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data
                subscriber.onNext(helper);
                subscriber.onCompleted();
              }catch(RetrofitError e){
                subscriber.onError(e)
              }
          }
     }
  }
}
Run Code Online (Sandbox Code Playgroud)

由于NetworkOnMainThread此行的异常导致RetrofitError失败:

  MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data
Run Code Online (Sandbox Code Playgroud)

订阅我的Observable:

myLoader.getMyData()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<MyData>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(MyData inventory) {

                    }
                });
Run Code Online (Sandbox Code Playgroud)

根据Rx文档flatMap,不对任何后台线程进行操作.我的问题是如何确保整个getMyData()方法在后台运行.

Dio*_*lor 0

当您在主线程中创建 MyLoader 对象时,很有可能 Observable.create 也会被执行(或者可能在代码之前的其他地方(?))。如果是这样,则.subscribeOn(Schedulers.io())对更改线程没有任何影响。

您可以尝试用 a 包装 .create().defer()以确保仅在订阅 Observable 时才创建它。

例如defer(() -> create(....))