如果 Kotlin Flows 发出第一个值的时间过长,如何使其超时

The*_*r42 8 kotlin kotlin-flow

我有一个可能永远不会被调用的监听器。但是,如果它至少被调用一次,我有理由确信它会被调用很多次。我是 Flows 的粉丝,因此我将其包装在callbackFlow() 构建器中。为了防止永远等待,我想添加一个超时时间。我正在尝试构建流运算符,如果流的第一个元素发出时间太长,它将抛出某种超时。这是我所拥有的。

fun <T> Flow<T>.flowBeforeTimeout(ms: Long): Flow<T> = flow {
    withTimeout(ms){
        emit(first())
    }
    emitAll(this@flowBeforeTimeout.drop(1))
}
Run Code Online (Sandbox Code Playgroud)

效果还不错,这些 JUnit4 测试都通过了。还有更多通过的测试,但为了简洁起见我省略了它们。

@Test(expected = CancellationException::class)
fun `Throws on timeout`(): Unit = runBlocking {
    val testFlow = flow {
        delay(200)
        emit(1)
    }

    testFlow.flowBeforeTimeout(100).toList()
}

Run Code Online (Sandbox Code Playgroud)
@Test
fun `No distortion`(): Unit = runBlocking {
    val testList = listOf(1,2,3)

    val resultList = testList
        .asFlow()
        .flowBeforeTimeout(100)
        .toList()

    assertThat(testList.size, `is`(resultList.size))
}
Run Code Online (Sandbox Code Playgroud)

然而,这个测试没有通过。

// Fails with: Expected: is <1> but: was <2>
@Test
fun `Starts only once`(): Unit = runBlocking {
    var flowStartCount = 0

    val testFlow = flow {
        flowStartCount++
        emit(1)
        emit(2)
        emit(3)
    }

    testFlow.flowBeforeTimeout(100).toList()
    assertThat(flowStartCount, `is`(1))
}
Run Code Online (Sandbox Code Playgroud)

有没有办法防止流程在first()和之间重新启动emitAll()

wve*_*ese 2

这个结果是由drop(1)您在实施操作员时需要的建议的。因为您正在使用withTimeout,所以您的操作如下flow

  • 它启动外部流,直到第一次发射发生;
  • 如果在超时时间内发生,则取消作业并再次运行外部流程;这就是第一次发射发生两次的原因。

我偶然遇到了你同样的挑战并编写了这个运算符。它可能不是世界上最有效的代码,但它是无状态的并且通过了您的所有测试。让我知道你的想法。

fun <T> Flow<T>.flowBeforeTimeout(duration: Long): Flow<T> =
    combine(
        booleanTimer(duration), // emits false at flow start, then true after duration
        onStart<T?> { emit(null) } // emits null at flow start
    ) { hasTimedOut, value ->
        if (hasTimedOut && value == null) {
            throw CancellationException("Flow timed out after $duration ms.")
        } else {
            value
        }
    }.filterNotNull() // necessary because of the null emitted at flow start
    .distinctUntilChanged() // necessary because of the duplicate value emitted when hasTimedOut is true

private fun booleanTimer(duration: Long): Flow<Boolean> =
    flow {
        emit(false)
        delay(duration)
        emit(true)
    }
Run Code Online (Sandbox Code Playgroud)