取消对信号的 kotlin 流收集

And*_*ndy 9 kotlin kotlin-coroutines

我正在努力为 Flow 创建一个“takeUntilSignal”运算符 - 一种扩展方法,当另一个流生成输出时将取消一个流。

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T>
Run Code Online (Sandbox Code Playgroud)

我最初的努力是尝试在与主要流收集相同的协程范围内启动信号流的收集,并取消协程范围:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
    kotlinx.coroutines.withContext(coroutineContext) {
        launch {
            signal.take(1).collect()
            println("signalled")
            cancel()
        }
        collect {
            emit(it)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

但这不起作用(并使用禁止的“withContext”方法,该方法被 Flow 明确剔除以防止使用)。

编辑 我把以下可憎的东西混在一起,这不太符合定义(结果流只会在从主流第一次发射后取消),我觉得有一个更好的方法:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
    combine(
        signal.map { it as Any? }.onStart { emit(null) }
    ) { x, y -> x to y }
        .takeWhile { it.second == null }
        .map { it.first }
Run Code Online (Sandbox Code Playgroud)

edit2 再次尝试,使用 channelFlow:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
    channelFlow {
        launch {
            signal.take(1).collect()
            println("hello!")
            close()
        }
        collect { send(it) }
        close()
    }
Run Code Online (Sandbox Code Playgroud)

Ren*_*ene 6

couroutineScope在里面使用并启动新的协程:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
    try {
        coroutineScope {
            launch {
                signal.take(1).collect()
                println("signalled")
                this@coroutineScope.cancel()
            }

            collect {
                emit(it)
            }
        }

    } catch (e: CancellationException) {
        //ignore
    }
}
Run Code Online (Sandbox Code Playgroud)