RxJava在与单元测试相同的线程上订阅和观察

soc*_*qwe 5 rx-java rx-android

我想为组件编写一种"blackbox测试",内部使用RxJava.

在内部,它使用Retrofit返回一个Observable来制作一个httpcall,然后用它.flatmap() 来对从改造中检索到的数据进行处理.我的想法是给这个组件一个Transformer用于在观察者上设置调度程序,如下所示:

class DefaultTransformer <T> implements Transformer<T, T> {

   public Observable<T> call(Observable<T> observable) { 
      return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
   }
}
Run Code Online (Sandbox Code Playgroud)

我的组件做了这样的事情:

void execute(Transformer<T, T> scheduler){
     Observable<List<Team>> observable = retrofitApi.getLeague(leagueId, seasonId)
        .flatMap(new Func1<LeagueWrapper, Observable<List<Team>>>() {
          @Override public Observable<List<Team>> call(LeagueWrapper wrapper) {                
             return Observable.just(wrapper.getLeague().getTeams());
          }
        });

   observable.compose(transformer);

   observable.subscribe(this);
}
Run Code Online (Sandbox Code Playgroud)

在生产中我DefaultTransformer作为参数传递,但是对于单元测试,我想提交一个Transformer在单元测试的同一个线程上运行,所以一切都应该同步运行(不是异步).

我试过了:

class UnitTestTransformer <T> implements Transformer<T, T> {

       public Observable<T> call(Observable<T> observable) { 
          return observable.subscribeOn(Schedulers.test()).observeOn(AndroidSchedulers.test());
       }
    }
Run Code Online (Sandbox Code Playgroud)

但它仍然在我的单元测试中运行异步.我也试过了Scheduler.immediate().toBlocking()似乎不是一个选择,因为它Observable不再是一个选择.知道什么可能是错的吗?

Ros*_*ick 3

如果无法更改调用方式execute(),您可能需要尝试使用 RxJava 插件机制。

https://github.com/ReactiveX/RxJava/wiki/Plugins

您可以提供:

  • RxJavaSchedulersHook覆盖测试执行期间提供的调度程序并让它们同步执行
  • RxJavaObservableExecutionHook挂钩 Observable 执行管道并使用某种同步方法(如 CountdownLatch 等待 Observable 订阅完成后再继续