cer*_*ran 3 scala akka akka-stream akka-http
我过去曾成功地使用过 Akka Streams,但是,我目前很难理解为什么 Akka-HTTP 中的客户端 Websocket Streams 是按照文档中所示的方式定义和工作的。
由于 WebSocket 连接允许全双工通信,因此我希望这种连接由 Akka HTTP 中的两个独立流表示,一个用于传入流量,另一个用于传出流量。事实上,文档说明如下:
WebSocket 由两个消息流组成 [...]
它还进一步指出,传入消息由 a 表示Sink,传出消息由 a 表示Source。这是我的第一个困惑点 - 当使用两个单独的流时,您可能希望总共处理两个源和两个接收器,而不是每种类型中的一个。目前,我的猜测是传入流的源以及传出流的接收器对开发人员来说并没有多大用处,因此只是“隐藏”。
但是,将所有内容连接在一起时确实会令人困惑(请参阅上面链接的文档)。
使用时有问题的部分singleWebSocketRequest:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
Run Code Online (Sandbox Code Playgroud)
或使用时相同的部分webSocketClientFlow:
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
Run Code Online (Sandbox Code Playgroud)
这与我目前对流工作流程的理解相矛盾。
Source用于传出消息和Sink用于传入消息?上面的代码看起来像我向自己而不是服务器发送消息。Flow[Message, Message, ...]?将传出消息转换为传入消息似乎没有意义。感谢任何有助于提高我理解的帮助,谢谢。
编辑:
我在使用Source和Sink通过 WebSocket 发送数据时没有问题,我只是想了解为什么阶段的接线是这样完成的。
WebSocket 确实由两个独立的流组成,只是这些流(可能)不在同一个 JVM 上。
您有两个对等点进行通信,一个是服务器,另一个是客户端,但是从已建立的 WebSocket 连接的角度来看,差异不再重要。一个数据流是peer 1向peer 2发送消息,另一个流是peer 2向peer 1发送消息,然后这两个peer之间存在网络边界。如果您一次查看一个对等点,则您让对等点 1 接收来自对等点 2 的消息,而在第二个流中,对等点 1 正在向对等点 2 发送消息。
每个对等点都有一个用于接收部分的 Sink 和一个用于发送部分的 Source。您实际上总共有两个源和两个接收器,只是不在同一个 ActorSystem 上(假设两个对等点都在 Akka HTTP 中实现)。peer 1的Source连接到peer 2的Sink,peer 2的Source连接到peer 1的Sink。
因此,您编写了一个 Sink 来描述如何通过第一个流处理传入消息,并编写一个 Source 来描述如何通过第二个流发送消息。通常,您希望根据接收到的消息生成消息,因此您可以将这两者连接在一起,并通过不同的流路由消息,这些流描述了如何对传入消息做出反应并生成任意数量的传出消息。这Flow[Message, Message, _]并不意味着您要将传出消息转换为传入消息,而是将传入消息转换为传出消息。
的webSocketFlow是一个典型的异步边界,表示所述其他对等端的流动。它通过将传出消息发送到其他对等方并发出其他对等方发送的任何内容,将传出消息“转换”为传入消息。
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
Run Code Online (Sandbox Code Playgroud)
此流是您对等方的两个流的一半:
[message from other peer] 连接到 printSinkhelloSource 连接到 [message to the other peer]传入消息和传出消息之间没有关系,您只需打印收到的所有内容并发送一个“hello world!”。实际上,由于源在一条消息后完成,WebSocket 连接也会关闭,但是如果您将源替换为例如Source.repeat,您将不断发送(真的是泛滥)“你好,世界!” 无论传入消息的速率如何。
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
Run Code Online (Sandbox Code Playgroud)
在这里,您获取来自 的所有内容outgoing,即您要发送的消息,将其路由通过webSocketFlow,通过与其他对等方通信“转换”消息并将每个接收到的消息生成到incoming. 通常,您有一个有线协议,您可以在其中对 case class/pojo/dto 消息与有线格式进行编码和解码。
val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???
val upgradeResponse = outgoing
.via(encode)
.viaMat(webSocketFlow)(Keep.right)
.via(decode)
.to(incoming)
.run()
Run Code Online (Sandbox Code Playgroud)
或者你可以想象某种聊天服务器(啊,websockets 和聊天),它向多个客户端广播和合并消息。这应该从任何客户端获取任何消息并将其发送给每个客户端(仅用于演示,未经测试,可能不是您想要的实际聊天服务器):
val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???
// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
(chatClientReceivers, chatClientSenders).zipped.map(
(outgoingSendToClient, incomingFromClient) =>
Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))
val toClients: Graph[SinkShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))
(broadcast.outArray, chatClientReceivers).zipped
.foreach((bcOut, client) => bcOut ~> b.add(client).in)
SinkShape(broadcast.in)
}
val fromClients: Graph[SourceShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Message](chatClientSenders.size))
(merge.inSeq, chatClientSenders).zipped
.foreach((mIn, client) => b.add(client).out ~> mIn)
SourceShape(merge.out)
}
val upgradeResponse: Future[WebSocketUpgradeResponse] =
Source.fromGraph(fromClients)
.viaMat(webSocketFlow)(Keep.right)
.to(toClients)
.run()
Run Code Online (Sandbox Code Playgroud)
希望这个对你有帮助。
| 归档时间: |
|
| 查看次数: |
611 次 |
| 最近记录: |