lic*_*hgo 5 scala akka akka-http
如何在存储变量中保持客户端(Web)连接,然后在需要时将传出消息发送到客户端(Web)?
我已经有了一些简单的代码,一旦服务器从客户端接收到消息,就可以将消息推回客户端。如何为外发邮件部分修改以下代码?
implicit val actorSystem = ActorSystem("akka-system")
implicit val flowMaterializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
val ip = "127.0.0.1"
val port = 32000
val route = get {
pathEndOrSingleSlash {
complete("Welcome to websocket server")
}
} ~
path("hello") {
get {
handleWebSocketMessages(echoService)
}
}
def sendMessageToClient(msg : String) {
// *** How to implement this?
// *** How to save the client connection when it is first connected?
// Then how to send message to this connection?
}
val echoService = Flow[Message].collect {
// *** Here the server push back messages when receiving msg from client
case tm : TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream)
case _ => TextMessage("Message type unsupported")
}
val binding = Http().bindAndHandle(route, ip, port)
Run Code Online (Sandbox Code Playgroud)
您可以通过调用研究管道化接收器流.map。在调用中,.map您可以捕获该值,然后返回相同的消息。例如:
Flow[Message].collect {
case tm : TextMessage =>
TextMessage(Source.single("Hello ") ++ tm.textStream.via(
Flow[String].map((message) => {println(message) /* capture value here*/; message})))
case _ => TextMessage("Message type unsupported")
}
Run Code Online (Sandbox Code Playgroud)
现在,如果您的目的是处理这些值并稍后发送值,那么您想要的不是单个源到接收器的流,而是用于接收器和源的两个单独的流,您可以使用Flow.fromSinkAndSource例如
Flow.fromSinkAndSource[Message, Message](
Flow[Message].collect { /* capture values */},
// Or send stream to other sink for more processing
source
)
Run Code Online (Sandbox Code Playgroud)
很可能,这个源要么是由图 DSL、手工操作的 Actor 构建的,要么您可以考虑利用可重用的帮助器,例如MergeHub.
| 归档时间: |
|
| 查看次数: |
615 次 |
| 最近记录: |