Tva*_*roh 8 scala akka-stream akka-http
在我的场景中,客户端发送"再见"websocket消息,我需要在服务器端关闭先前建立的连接.
来自akka-http 文档:
通过从服务器逻辑中取消传入连接流(例如,将其下游连接到Sink.cancelled,将其上游连接到Source.empty),可以关闭连接.也可以通过取消IncomingConnection源连接来关闭服务器的套接字.
但是我不清楚如何考虑到这一点Sink并Source在协商新连接时设置一次:
(get & path("ws")) {
optionalHeaderValueByType[UpgradeToWebsocket]() {
case Some(upgrade) ?
val connectionId = UUID()
complete(upgrade.handleMessagesWithSinkSource(sink, source))
case None ?
reject(ExpectedWebsocketRequestRejection)
}
}
Run Code Online (Sandbox Code Playgroud)
提示:该答案基于akka-stream-experimental版本2.0-M2。该API在其他版本中可能会略有不同。
关闭连接的一种简单方法是使用PushStage:
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ?
// println("Connection closed")
ctx.finish()
case msg ?
ctx.push(msg)
}
}
Run Code Online (Sandbox Code Playgroud)
在客户端或服务器端接收到的每个元素(通常每个通过的元素Flow)都经过这样的Stage组件。在Akka中,称为完整抽象GraphStage,有关更多信息,请参见官方文档。
使用a,PushStage我们可以观察具体的传入元素的值,然后相应地转换上下文。在上面的示例中,一旦goodbye收到消息,我们将完成上下文,否则我们将通过push方法转发值。
现在,我们可以closeClient通过该transform方法将组件连接到任意流:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ? closeClient)
.map(_ ? StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
Run Code Online (Sandbox Code Playgroud)
上面的流接收a ByteString并返回a ByteString,这意味着它可以connection通过该join方法连接到。在流程内部,我们首先将字节转换为字符串,然后再将其发送到closeClient。如果PushStage没有完成流,则该元素将在流中转发,在该流中该元素将被丢弃并由stdin的某些输入替代,然后通过导线将其发送回去。如果流完成,将删除阶段组件之后的所有其他流处理步骤-现在关闭流。
| 归档时间: |
|
| 查看次数: |
2071 次 |
| 最近记录: |