为Rx v2 Flowable编写同步单元测试

Jor*_*gen 3 java android reactive-programming rx-java rx-android

我正在将我的项目从Rx v1转换为Rx v2,我现在正在将一些v1 Observables改为v2 Flowables.

(它在Android项目中使用Spock用Groovy编写的单元测试)

通常我会使用钩子覆盖调度程序.我仍然可以通过注册调度程序处理程序在v2中执行此操作.这Observable通过始终使用(new?)使s同步Schedulers.single().然而,Flowable由于背压机制(?),s仍然是异步的.

我尝试使用以下方法解决这个问题:

Flowable<LogEntry> flowable = Flowable.create(new FlowableOnSubscribe<LogEntry>() {
    @Override
    void subscribe(FlowableEmitter<LogEntry> emitter) throws Exception {
        for (def log : logs) {
            emitter.onNext(log)
        }

        emitter.onComplete()
    }
}, FlowableEmitter.BackpressureMode.NONE);
Run Code Online (Sandbox Code Playgroud)

但这仍然使它们异步.

我已经覆盖了这样的调度程序:

RxJavaPlugins.reset()
RxJavaPlugins.setIoSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    Scheduler apply(Scheduler scheduler) throws Exception {
        return Schedulers.single()
    }
})

RxAndroidPlugins.reset()
RxAndroidPlugins.setMainThreadSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    Scheduler apply(Scheduler scheduler) throws Exception {
        return Schedulers.from(new Executor() {
            @Override
            void execute(Runnable command) {
                command.run()
            }
        })
    }
})
Run Code Online (Sandbox Code Playgroud)

我似乎无法弄清楚为什么Observables表现得像这样同步,但是Flowables没有(在背压机制旁边)

aka*_*okd 5

Schedulers.single()是一个单线程异步调度程序.你需要Schedulers.trampoline()保持同一个线程.