如何测试该observable在RxJava中使用正确的调度程序?

net*_*men 7 unit-testing mockito rx-java

我有以下课程(简化):

public class Usecase<T> {
    private final Observable<T> get;
    private final Scheduler observeScheduler;

    public Usecase(Observable<T> get, Scheduler observeScheduler) {
        this.get = get;
        this.observeScheduler = observeScheduler;
    }

    public Observable<T> execute() {
        return get.subscribeOn(Schedulers.io()).observeOn(observeScheduler);
    }
}
Run Code Online (Sandbox Code Playgroud)

我正在为它编写单元测试.如何测试subscribeOnobserveOn使用正确的值调用?

我尝试以下方法:

    Observable<String> observable = mock(Observable.class);
    Usecase<String> usecase = new Usecase(observable, Schedulers.computation());
    usecase.execute();

    verify(observable).subscribeOn(Schedulers.computation()); // should fail here, but passes
    verify(observable).observeOn(Schedulers.computation());   // should pass, but fails: Missing method call for verify(mock) here
Run Code Online (Sandbox Code Playgroud)

以上失败(我认为)因为subscribeOn而且observeOnfinal方法.那么可能还有其他方法可以确保observable使用正确的调度程序?

小智 5

有一种方法可以间接访问 observable 正在运行和被观察的两个线程,这意味着您实际上可以验证Observable使用正确的调度程序。

我们仅限于按名称验证线程。幸运的是,by 使用的线程使用Schedulers.io()我们可以匹配的一致前缀命名。这是具有唯一前缀的调度程序的(完整?)列表以供参考:

  • Schedulers.io()- RxCachedThreadScheduler
  • Schedulers.newThread()- RxNewThreadScheduler
  • Schedulers.computation()- RxComputationThreadPool

要验证在IO 线程订阅了 Observable :

// Calling Thread.currentThread() inside Observer.OnSubscribe will return the
// thread the Observable is running on (Schedulers.io() in our case)
Observable<String> obs = Observable.create((Subscriber<? super String> s) -> {
    s.onNext(Thread.currentThread().getName());
    s.onCompleted();
})

// Schedule the Observable
Usecase usecase = new Usecase(obs, Schedulers.immediate());
Observable usecaseObservable = usecase.execute();

// Verify the Observable emitted the name of the IO thread
String subscribingThread = usecaseObservable.toBlocking().first();
assertThat(subscribingThread).startsWith("RxCachedThreadScheduler");
Run Code Online (Sandbox Code Playgroud)

要验证在计算线程观察到Observable ,您可以使用访问用于观察的最后一个线程。TestSubscriber#getLastSeenThread

TestSubscriber<Object> subscriber = TestSubscriber.create();
UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation())
usecase.execute().subscribe(subscriber);

// The observable runs asynchronously, so wait for it to complete
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();

// Verify the observable was observed on the computation thread
String observingThread = subscriber.getLastSeenThread().getName();
assertThat(observingThread).startsWith("RxComputationThreadPool");
Run Code Online (Sandbox Code Playgroud)

尽管我使用AssertJ进行流畅的startsWith断言,但不需要第三方库或模拟