我需要处理流收集中的当前值和先前值,因此我需要一些具有如下作用的运算符:
----A----------B-------C-----|--->
---(null+A)---(A+B)---(B+C)--|--->
Run Code Online (Sandbox Code Playgroud)
一个想法是这样的:
fun <T: Any> Flow<T>.withPrevious(): Flow<Pair<T?, T>> = flow {
var prev: T? = null
this@withPrevious.collect {
emit(prev to it)
prev = it
}
}
Run Code Online (Sandbox Code Playgroud)
但这样就无法控制执行第一个流程的上下文。有没有更灵活的解决方案?
coroutine kotlin kotlin-coroutines coroutinescope kotlin-flow
我在服务器上使用 ktor 并有一个可能执行很长时间(一分钟或更长时间)的请求。我想检测客户端请求取消以释放资源并取消长时间运行的操作。有什么办法可以做到吗?
提前致谢
我使用 ktor 通过 websockets 进行服务器端开发。
文档向我们展示了使用传入通道的示例:
for (frame in incoming.mapNotNull { it as? Frame.Text }) {
// some
}
Run Code Online (Sandbox Code Playgroud)
但mapNotNull被标记为已弃用,以支持Flow. 我应该如何使用这个API以及可能存在哪些问题?例如,这Flow是一股冷流。这意味着将在每个collect. 它在 websocket 上下文中如何工作。是否会在第二次collect调用时重新打开,或者旧消息可能会在下一次调用后传递一次collect?如何收集N消息,然后停止收集,然后再次收集?
提前致谢 :)