ktor websocket flow api 是如何工作的?

Vla*_*yar 4 websocket server ktor

我使用 ktor 通过 websockets 进行服务器端开发。

文档向我们展示了使用传入通道的示例:

for (frame in incoming.mapNotNull { it as? Frame.Text }) {
    // some
}
Run Code Online (Sandbox Code Playgroud)

mapNotNull被标记为已弃用,以支持Flow. 我应该如何使用这个API以及可能存在哪些问题?例如,这Flow是一股冷流。这意味着将在每个collect. 它在 websocket 上下文中如何工作。是否会在第二次collect调用时重新打开,或者旧消息可能会在下一次调用后传递一次collect?如何收集N消息,然后停止收集,然后再次收集?

提前致谢 :)

use*_*304 5

我应该如何使用这个API以及可能存在哪些问题?

我正在使用的以及我在文档中某处的示例之一中看到的是consumeAsFlow()调用 on 的方法ReceiveChannel。这是整个片段:

webSocket("/websocket") { //this: DefaultWebSocketServerSession
    incoming
        .consumeAsFlow()
        .map { receive(it) }
        .collect()
}
Run Code Online (Sandbox Code Playgroud)

没有发现这种方法存在重大问题。您应该注意的一件事(但这也适用于非流方法)是,如果您将其放入流中,那么它将破坏 WebSocket 连接,这通常不是您想要做的事情。可能值得考虑将整个东西包装在try-catch.

是否会在第二次对方付费呼叫时重新打开,或者旧消息可能会在下次对方付费后传递一次?

您甚至可以在开始使用流中的消息之前打开 websocket。你可以看到你的内心webSocket() {}处于 的背景中DefaultWebSocketServerSession。这是您的连接管理。在您的流程中,您只是在消息到达时(在建立连接之后)一一接收消息。如果连接中断,那么您就脱离了流程。您需要先重新建立它,然后才能处理您的消息。这个建立位是通过Route.webSocket()方法完成的。我确实建议您看一下它的 Javadoc。

如果您希望在连接关闭后添加一些清理操作,您可以添加一个finally块,如下所示:

webSocket("/chat") {
    try {
        incoming
            .consumeAsFlow()
            .map { receive(it, client) }
            .collect()
    } finally {
        // cleanup
    }
}
Run Code Online (Sandbox Code Playgroud)

简而言之:collect每收到一条消息就调用一次。如果没有连接(或者连接已断开),则collect不会被调用。

如何收集N条消息,然后停止收集,然后再次收集?

这有什么用例?我认为你不应该以任何流程来做这件事。您当然可以take(n)从流程中获取项目,但您将无法再从中获取更多项目。