Sor*_*ona 6 broadcast websocket akka playframework akka-stream
我正在编写一个纯websocket web应用程序,这意味着在websocket升级之前没有用户/客户端步骤,更具体地说:身份验证请求和其他通信一样遍历websockets
有/是:
现在,并非每个客户端都可以访问每个项目 - 对它的访问控制是在服务器端(ofc)实现的,并且与websockets本身无关.
我的问题是,我希望允许协作,这意味着N个客户可以一起处理1个项目.
现在,如果其中一个客户端修改了某些内容,我想通知正在处理该项目的所有其他客户端.
这一点尤其重要,因为atm我是唯一一个正在研究这个并进行测试的人,这是我身上的重大疏忽,因为现在:
如果客户端A连接到Project X并且客户端B连接到Proejct Y,如果其中任何一个在其各自的项目中更新某些内容,则另一个会收到有关这些更改的通知.
现在我的WebsocketController相当简单,我基本上有这个:
private val fanIn = MergeHub.source[AllowedWSMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()
def handle: WebSocket = WebSocket.accept[AllowedWSMessage, AllowedWSMessage]
{
_ => Flow.fromSinkAndSource(fanIn, fanOut)
}
Run Code Online (Sandbox Code Playgroud)
现在从我的理解,我需要的是
1)每个项目有多个websocket端点,例如/ api/{project_identifier}/ws
(X)或
2)根据他们正在工作的项目拆分WebSocket连接/连接客户端的一些方法.
因为我不想去路线1)我将分享我的想法2):
我现在没有看到解决方法的问题是,我可以在服务器端轻松创建一些集合,在那里我存储哪个用户在任何给定时刻处理哪个项目(例如,如果他们选择/切换项目,客户端将其发送到服务器并存储此信息)
但我仍然有那个fanOut,所以这不会解决我在WebSocket/AkkaStreams方面的问题.
是否有一些魔法(过滤)被调用,BroadcastHub这是我想要的?
编辑:现在在这里共享我的整个websocket逻辑,尝试后但未能应用@James Roper的良好提示:
class WebSocketController @Inject()(implicit cc: ControllerComponents, ec: ExecutionContext, system: ActorSystem, mat: Materializer) extends AbstractController(cc)
Run Code Online (Sandbox Code Playgroud)
{val logger:Logger = Logger(this.getClass())
type WebSocketMessage = Array[Byte]
import scala.concurrent.duration._
val tickingSource: Source[WebSocketMessage, Cancellable] =
Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
.map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)
private val generalActor = system.actorOf(Props
{
new myActor(system, "generalActor")
}, "generalActor")
private val serverMessageSource = Source
.queue[WebSocketMessage](10, OverflowStrategy.backpressure)
.mapMaterializedValue
{ queue => generalActor ! InitTunnel(queue) }
private val sink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(generalActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
private val source: Source[WebSocketMessage, Cancellable] = tickingSource.merge(serverMessageSource)
private val fanIn = MergeHub.source[WebSocketMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()
// TODO switch to WebSocket.acceptOrResult
def handle: WebSocket = WebSocket.accept[WebSocketMessage, WebSocketMessage]
{
//_ => createFlow()
_ => Flow.fromSinkAndSource(fanIn, fanOut)
}
private val projectHubs = TrieMap.empty[String, (Sink[WebSocketMessage, NotUsed], Source[WebSocketMessage, NotUsed])]
private def buildProjectHub(projectName: String) =
{
logger.info(s"building projectHub for $projectName")
val projectActor = system.actorOf(Props
{
new myActor(system, s"${projectName}Actor")
}, s"${projectName}Actor")
val projectServerMessageSource = Source
.queue[WebSocketMessage](10, OverflowStrategy.backpressure)
.mapMaterializedValue
{ queue => projectActor ! InitTunnel(queue) }
val projectSink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(projectActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
val projectSource: Source[WebSocketMessage, Cancellable] = tickingSource.merge(projectServerMessageSource)
val projectFanIn = MergeHub.source[WebSocketMessage].to(projectSink).run()
val projectFanOut = projectSource.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()
(projectFanIn, projectFanOut)
}
private def getProjectHub(userName: String, projectName: String): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
logger.info(s"trying to get projectHub for $projectName")
val (sink, source) = projectHubs.getOrElseUpdate(projectName, {
buildProjectHub(projectName)
})
Flow.fromSinkAndSourceCoupled(sink, source)
}
private def extractUserAndProject(msg: WebSocketMessage): (String, String) =
{
Wrapper.parseFrom(msg).`type` match
{
case m: MessageType =>
val message = m.value
(message.userName, message.projectName)
case _ => ("", "")
}
}
private def createFlow(): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
// broadcast source and sink for demux/muxing multiple chat rooms in this one flow
// They'll be provided later when we materialize the flow
var broadcastSource: Source[WebSocketMessage, NotUsed] = null
var mergeSink: Sink[WebSocketMessage, NotUsed] = null
Flow[WebSocketMessage].map
{
m: WebSocketMessage =>
val msg = Wrapper.parseFrom(m)
logger.warn(s"client sent project related message: ${msg.toString}");
m
}.map
{
case isProjectRelated if !extractUserAndProject(isProjectRelated)._2.isEmpty =>
val (userName, projectName) = extractUserAndProject(isProjectRelated)
logger.info(s"userName: $userName, projectName: $projectName")
val projectFlow = getProjectHub(userName, projectName)
broadcastSource.filter
{
msg =>
val (_, project) = extractUserAndProject(msg)
logger.info(s"$project == $projectName")
(project == projectName)
}
.via(projectFlow)
.runWith(mergeSink)
isProjectRelated
case other =>
{
logger.info("other")
other
}
} via {
Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[WebSocketMessage], MergeHub.source[WebSocketMessage])
{
(source, sink) =>
broadcastSource = source
mergeSink = sink
source.filter(extractUserAndProject(_)._2.isEmpty)
.map
{ x => logger.info("Non project related stuff"); x }
.via(Flow.fromSinkAndSource(fanIn, fanOut))
.runWith(sink)
NotUsed
}
}
}
Run Code Online (Sandbox Code Playgroud)
}
解决方案/理念我如何理解它:
1)我们有一个"包装器流",其中我们有一个为null的broadcastSource和mergeSink,直到我们在外部} via {块中实现它们为止
2)在"包装流程"中,我们映射每个元素以检查它.
I)如果项目相关,我们
a)为项目获取/创建自己的子流b)根据项目名称过滤元素c)让那些通过过滤器的子项被子/项目流使用,以便连接到项目的每个人都获得该元素
II)如果它与项目无关,我们只是传递它
3)我们的包装流程是通过"按需"物化流程进行的,在实现via它的地方,我们将非项目相关的元素分发给所有连接的Web套接字客户端.
总结一下:我们有一个websocket连接的"包装流",它可以通过projectFlow或generalFlow进行,具体取决于它正在使用的消息/元素.
我现在的问题是(并且它似乎是微不足道的,但我正在以某种方式挣扎)每条消息都应该进入myActor(atm)并且应该有消息从那里出来(见serverMesssageSource和source)
但是上面的代码创建了非确定性结果,例如一个客户端发送了2条消息,但是有4条正在处理(根据日志和服务器发回的结果),有时消息在从控制器到演员的路上突然丢失.
我无法解释这一点,但如果我离开它只是_ => Flow.fromSinkAndSource(fanIn, fanOut)每个人都得到了一切,但至少如果只有一个客户端它完全符合预期(显然:))
我实际上建议使用 Play 的socket.io 支持。这提供了命名空间,从您的描述中我可以看出,它可以直接实现您想要的内容 - 每个命名空间都是其自己独立管理的流程,但所有命名空间都遵循相同的 WebSocket。我今天写了一篇关于为什么您可能选择使用 socket.io 的博客文章。
如果你不想使用socket.io,我在这里有一个例子(它使用socket.io,但不使用socket.io命名空间,所以可以很容易地适应在直接WebSockets上运行),它显示了多聊天房间协议 - 它将消息送入 BroadcastHub,然后为用户当前所在的每个聊天室提供一个对中心的订阅(对于您来说,这将是针对每个项目的一个订阅)。每个订阅都会过滤来自中心的消息,以仅包含该订阅聊天室的消息,然后将消息馈送到该聊天室 MergeHub。
这里突出显示的代码根本不是特定于socket.io的,如果您可以将WebSocket连接改编为 的流ChatEvent,则可以按原样使用:
要满足通过每个人都连接到的广播频道定向非项目特定消息的要求,首先创建该频道:
val generalFlow = {
val (sink, source) = MergeHub.source[NonProjectSpecificEvent]
.toMat(BroadcastHub.sink[NonProjectSpecificEvent])(Keep.both).run
Flow.fromSinkAndSourceCoupled(sink, source)
}
Run Code Online (Sandbox Code Playgroud)
然后,当每个已连接的 WebSocket 的广播接收器/源连接时,将其附加(这来自聊天示例:
} via {
Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[YourEvent], MergeHub.source[YourEvent]) { (source, sink) =>
broadcastSource = source
mergeSink = sink
source.filter(_.isInstanceOf[NonProjectSpecificEvent])
.via(generalFlow)
.runWith(sink)
NotUsed
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
252 次 |
| 最近记录: |