Rx-Kotlin awaitTerminalEvent从未获得onComplete

ssc*_*itz 3 testing unit-testing kotlin rx-kotlin rx-java2

我试图更好地理解如何使用Rx-Kotlin进行单元测试,但是我无法成功地将主题设置为"已完成".因此,我总是在等待5秒的超时(onComplete应该是立即的),然后在assertComplete上失败.

我对awaitTerminalEvent的理解是它应该只阻塞,直到调用onComplete.我也研究过TestScheduler,但我不相信这里应该要求它.

任何可以引导我朝着正确方向前进的帮助或文档都将非常感激.

@Test
fun testObservable() {
    val subject = BehaviorSubject.create<Int>()
    subject.onNext(0)

    TestSubscriber<Int>().apply {
        subject.subscribe({
            System.out.println(it)
            subject.onNext(1)
            subject.onComplete()
        })

        this.awaitTerminalEvent(5, TimeUnit.SECONDS)
        this.assertComplete()
        this.assertValue(1)
    }
}
Run Code Online (Sandbox Code Playgroud)

yos*_*riz 5

你以错误的方式使用了错误的工具......

  • TestSubscriber是用于测试Flowable,你应该在这里使用TestObserver.
  • 你应订阅TestObserver(或TestSubscriberin Flowable),以便它监测排放并能够等待终端事件和断言值.在您的代码中,TestSubscriber它不会附加到任何流,因此它永远不会得到任何事件.

试图模仿你的代码,它可能是这样的:

 @Test
fun testObservable() {
    val subject = BehaviorSubject.create<Int>()
    subject.onNext(0)

    TestObserver<Int>().apply {
        subject.doOnNext {
            System.out.println(it)
            subject.onNext(1)
            subject.onComplete()
        }
                .subscribe(this)

        this.awaitTerminalEvent(5, TimeUnit.SECONDS)
        this.assertComplete()
        this.assertValue(1)
    }
}  
Run Code Online (Sandbox Code Playgroud)

你可以看到,我使用的TestObserver是与完成认购TestObserver对象,主体onNext(),onComplete()移动到doOnNext().由于您有两个发射值,测试将失败,而测试仅断言单个"1"值.

一般来说,这是一种错误,你使用subject再次发出onNext()然后调用onComplete(),你可以先订阅然后发出外部.这样的事情:

TestObserver<Int>().apply {
        subject.subscribe(this)
        subject.onNext(1)
        subject.onComplete()
        ....
}
Run Code Online (Sandbox Code Playgroud)