TestScheduler 不适用于 RxJava

Mat*_*lli 5 java rx-java

我正在尝试测试一个函数,其中流的元素在延迟后一个一个地分派,我能够使用Thread.sleep. 但是,当我使用时,TestScheduler.advanceTimeBy我无法得到任何结果。

查看代码:

public Observable<Object> getDelayedObjects(Observable<Observable<Object>> objectsStreams) {
    objectsStreams.concatMap(objectsStream ->
        objectsStream.repeat().concatMap(object ->
            Observable.just(object)
                      .delay(getDuration(object), TimeUnit.MILLISECONDS)));
}
Run Code Online (Sandbox Code Playgroud)

和测试代码:

TestScheduler testScheduler = new TestScheduler();
BehaviorSubject<Observable<Object>> objectStreamSubject = BehaviorSubject.create(objectsStream);

model.getDelayedObjects(objectStreamSubject)
        .observeOn(testScheduler)
        .subscribeOn(testScheduler)
        .subscribe(testSubscriber);

testScheduler.triggerActions();
//Thread.sleep(900) works with the default scheduler
testScheduler.advanceTimeBy(900, TimeUnit.MILLISECONDS);
testSubscriber.assertReceivedOnNext(objects);
Run Code Online (Sandbox Code Playgroud)

更新:

检查TestScheduler使用情况我发现通常将调度程序传递给delay函数。因此,我能够通过将调度程序作为方法的参数提供getDelayedObjectsdelay. 但是,我仍然不明白为什么它之前不起作用。

Pra*_*pta 6

delay运营商,默认情况下,使用计算调度,执行基于时间延迟。该信息可以在该方法的文档中找到。在@SchedulerSupport注释中查找值,在本例中为io.reactivex:computation

出于测试目的,您必须将计算调度程序替换为TestScheduler. 为了能够更换,你将不得不使用许多覆盖之一delay运营商,它接受一个Scheduler