使用无限流测试 Kotlin Flow.combine

Chr*_*ner 5 kotlin kotlin-coroutines kotlin-flow

我有以下测试

@Test
fun combineUnendingFlows() = runBlockingTest {

    val source1 = flow {
        emit("a")
        emit("b")
        suspendCancellableCoroutine { /* Never complete. */ }
    }

    val source2 = flowOf(1, 2, 3)

    val combinations = mutableListOf<String>()

    combine(source1, source2) { first, second -> "$first$second" }
        .onEach(::println)
        .onEach(combinations::add)
        .launchIn(this)

    advanceUntilIdle()

    assertThat(combinations).containsExactly("a1", "b1", "b2", "b3")
}
Run Code Online (Sandbox Code Playgroud)

断言成功,但测试失败,但出现以下异常:

kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs
Run Code Online (Sandbox Code Playgroud)

我知道这是人为的,我们可以通过确保完成来轻松完成此传递source1,但我想知道为什么它会失败?runBlockingTest测试永无止境的流程的方法是否错误?

(这是与协程一起使用的1.4.0)。

Chr*_*ner 3

问题的关键似乎是runBlockingTest(可能是正确的),预计在完成时不会有正在运行的作业。

如果我们可以显式取消 提供的作用域runBlockingTest,那就没问题了,但是尝试调用.cancel()该作用域会引发异常。

这个 GitHub 问题中有关于该问题的更多讨论。

Job在这个人为的示例中,获取对创建者的引用launchIn()并在测试结束之前取消它是非常简单的。

更复杂的场景可以捕获UncompletedCoroutinesError并忽略它,因为他们预计会有未完成的协程,或者他们可以创建一个新的子协程CoroutineScope来显式取消。在这个例子中,看起来像

@Test
fun combineUnendingFlows() = runBlockingTest {
    val job = Job()
    val childScope = CoroutineScope(this.coroutineContext + job)

    val source1 = flow {
        emit("a")
        emit("b")
        suspendCancellableCoroutine { /* Never complete. */ }
    }

    val source2 = flowOf(1, 2, 3)

    val combinations = mutableListOf<String>()

    combine(source1, source2) { first, second -> "$first$second" }
        .onEach(::println)
        .onEach(combinations::add)
        .launchIn(childScope)

    advanceUntilIdle()
    assertThat(combinations).containsExactly("a1", "b1", "b2", "b3")
    job.cancel()
}

Run Code Online (Sandbox Code Playgroud)