BroadcastHub过滤基于"资源"连接的客户端正在进行?

Sor*_*ona 6 broadcast websocket akka playframework akka-stream

我正在编写一个纯websocket web应用程序,这意味着在websocket升级之前没有用户/客户端步骤,更具体地说:身份验证请求和其他通信一样遍历websockets

有/是:

  • / api/ws上只有一个websocket端点
  • 连接到该端点的多个客户端
  • 多个客户的多个项目

现在,并非每个客户端都可以访问每个项目 - 对它的访问控制是在服务器端(ofc)实现的,并且与websockets本身无关.

我的问题是,我希望允许协作,这意味着N个客户可以一起处理1个项目.

现在,如果其中一个客户端修改了某些内容,我想通知正在处理该项目的所有其他客户端.

这一点尤其重要,因为atm我是唯一一个正在研究这个并进行测试的人,这是我身上的重大疏忽,因为现在:

如果客户端A连接到Project X并且客户端B连接到Proejct Y,如果其中任何一个在其各自的项目中更新某些内容,则另一个会收到有关这些更改的通知.

现在我的WebsocketController相当简单,我基本上有这个:

private val fanIn = MergeHub.source[AllowedWSMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()

def handle: WebSocket = WebSocket.accept[AllowedWSMessage, AllowedWSMessage]
{
  _ => Flow.fromSinkAndSource(fanIn, fanOut)
}
Run Code Online (Sandbox Code Playgroud)

现在从我的理解,我需要的是

1)每个项目有多个websocket端点,例如/ api/{project_identifier}/ws

(X)或

2)根据他们正在工作的项目拆分WebSocket连接/连接客户端的一些方法.

因为我不想去路线1)我将分享我的想法2):

我现在没有看到解决方法的问题是,我可以在服务器端轻松创建一些集合,在那里我存储哪个用户在任何给定时刻处理哪个项目(例如,如果他们选择/切换项目,客户端将其发送到服务器并存储此信息)

但我仍然有那个fanOut,所以这不会解决我在WebSocket/AkkaStreams方面的问题.

是否有一些魔法(过滤)被调用,BroadcastHub这是我想要的?

编辑:现在在这里共享我的整个websocket逻辑,尝试后但未能应用@James Roper的良好提示:

 class WebSocketController @Inject()(implicit cc: ControllerComponents, ec: ExecutionContext, system: ActorSystem, mat: Materializer) extends AbstractController(cc)
Run Code Online (Sandbox Code Playgroud)

{val logger:Logger = Logger(this.getClass())

type WebSocketMessage = Array[Byte]

import scala.concurrent.duration._

val tickingSource: Source[WebSocketMessage, Cancellable] =
  Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
    .map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)

private val generalActor = system.actorOf(Props
{
  new myActor(system, "generalActor")
}, "generalActor")

private val serverMessageSource = Source
  .queue[WebSocketMessage](10, OverflowStrategy.backpressure)
  .mapMaterializedValue
  { queue => generalActor ! InitTunnel(queue) }

private val sink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(generalActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
private val source: Source[WebSocketMessage, Cancellable] = tickingSource.merge(serverMessageSource)

private val fanIn = MergeHub.source[WebSocketMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()

// TODO switch to WebSocket.acceptOrResult
def handle: WebSocket = WebSocket.accept[WebSocketMessage, WebSocketMessage]
  {
    //_ => createFlow()
    _ => Flow.fromSinkAndSource(fanIn, fanOut)
  }

private val projectHubs = TrieMap.empty[String, (Sink[WebSocketMessage, NotUsed], Source[WebSocketMessage, NotUsed])]

private def buildProjectHub(projectName: String) =
{
  logger.info(s"building projectHub for $projectName")

  val projectActor = system.actorOf(Props
  {
    new myActor(system, s"${projectName}Actor")
  }, s"${projectName}Actor")

  val projectServerMessageSource = Source
    .queue[WebSocketMessage](10, OverflowStrategy.backpressure)
    .mapMaterializedValue
    { queue => projectActor ! InitTunnel(queue) }

  val projectSink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(projectActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
  val projectSource: Source[WebSocketMessage, Cancellable] = tickingSource.merge(projectServerMessageSource)

  val projectFanIn = MergeHub.source[WebSocketMessage].to(projectSink).run()
  val projectFanOut = projectSource.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()

  (projectFanIn, projectFanOut)
}

private def getProjectHub(userName: String, projectName: String): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
  logger.info(s"trying to get projectHub for $projectName")

  val (sink, source) = projectHubs.getOrElseUpdate(projectName, {
    buildProjectHub(projectName)
  })

  Flow.fromSinkAndSourceCoupled(sink, source)
}

private def extractUserAndProject(msg: WebSocketMessage): (String, String) =
{
  Wrapper.parseFrom(msg).`type` match
  {
    case m: MessageType =>
      val message = m.value
      (message.userName, message.projectName)
    case _ => ("", "")
  }
}

private def createFlow(): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
  // broadcast source and sink for demux/muxing multiple chat rooms in this one flow
  // They'll be provided later when we materialize the flow
  var broadcastSource: Source[WebSocketMessage, NotUsed] = null
  var mergeSink: Sink[WebSocketMessage, NotUsed] = null

  Flow[WebSocketMessage].map
  {
    m: WebSocketMessage =>
    val msg = Wrapper.parseFrom(m)
    logger.warn(s"client sent project related message: ${msg.toString}");
    m
  }.map
    {
      case isProjectRelated if !extractUserAndProject(isProjectRelated)._2.isEmpty =>
        val (userName, projectName) = extractUserAndProject(isProjectRelated)

        logger.info(s"userName: $userName, projectName: $projectName")
        val projectFlow = getProjectHub(userName, projectName)

        broadcastSource.filter
        {
          msg =>
            val (_, project) = extractUserAndProject(msg)
            logger.info(s"$project == $projectName")
            (project == projectName)
        }
          .via(projectFlow)
          .runWith(mergeSink)

        isProjectRelated

      case other =>
      {
        logger.info("other")
        other
      }
    } via {
      Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[WebSocketMessage], MergeHub.source[WebSocketMessage])
      {
        (source, sink) =>
          broadcastSource = source
          mergeSink = sink

          source.filter(extractUserAndProject(_)._2.isEmpty)
            .map
            { x => logger.info("Non project related stuff"); x }
            .via(Flow.fromSinkAndSource(fanIn, fanOut))
            .runWith(sink)

          NotUsed
      }
    }
}
Run Code Online (Sandbox Code Playgroud)

}

解决方案/理念我如何理解它:

1)我们有一个"包装器流",其中我们有一个为null的broadcastSource和mergeSink,直到我们在外部} via {块中实现它们为止

2)在"包装流程"中,我们映射每个元素以检查它.

I)如果项目相关,我们

a)为项目获取/创建自己的子流b)根据项目名称过滤元素c)让那些通过过滤器的子项被子/项目流使用,以便连接到项目的每个人都获得该元素

II)如果它与项目无关,我们只是传递它

3)我们的包装流程是通过"按需"物化流程进行的,在实现via它的地方,我们将非项目相关的元素分发给所有连接的Web套接字客户端.

总结一下:我们有一个websocket连接的"包装流",它可以通过projectFlow或generalFlow进行,具体取决于它正在使用的消息/元素.

我现在的问题是(并且它似乎是微不足道的,但我正在以某种方式挣扎)每条消息都应该进入myActor(atm)并且应该有消息从那里出来(见serverMesssageSourcesource)

但是上面的代码创建了非确定性结果,例如一个客户端发送了2条消息,但是有4条正在处理(根据日志和服务器发回的结果),有时消息在从控制器到演员的路上突然丢失.

我无法解释这一点,但如果我离开它只是_ => Flow.fromSinkAndSource(fanIn, fanOut)每个人都得到了一切,但至少如果只有一个客户端它完全符合预期(显然:))

Jam*_*per 2

我实际上建议使用 Play 的socket.io 支持。这提供了命名空间,从您的描述中我可以看出,它可以直接实现您想要的内容 - 每个命名空间都是其自己独立管理的流程,但所有命名空间都遵循相同的 WebSocket。我今天写了一篇关于为什么您可能选择使用 socket.io 的博客文章。

如果你不想使用socket.io,我在这里有一个例子(它使用socket.io,但不使用socket.io命名空间,所以可以很容易地适应在直接WebSockets上运行),它显示了多聊天房间协议 - 它将消息送入 BroadcastHub,然后为用户当前所在的每个聊天室提供一个对中心的订阅(对于您来说,这将是针对每个项目的一个订阅)。每个订阅都会过滤来自中心的消息,以仅包含该订阅聊天室的消息,然后将消息馈送到该聊天室 MergeHub。

这里突出显示的代码根本不是特定于socket.io的,如果您可以将WebSocket连接改编为 的流ChatEvent,则可以按原样使用:

https://github.com/playframework/play-socket.io/blob/c113e74a4d9b435814df1ccdc885029c397d9179/samples/scala/multi-room-chat/app/chat/ChatEngine.scala#L84-L125

要满足通过每个人都连接到的广播频道定向非项目特定消息的要求,首先创建该频道:

val generalFlow = {
  val (sink, source) = MergeHub.source[NonProjectSpecificEvent]
    .toMat(BroadcastHub.sink[NonProjectSpecificEvent])(Keep.both).run
  Flow.fromSinkAndSourceCoupled(sink, source)
}
Run Code Online (Sandbox Code Playgroud)

然后,当每个已连接的 WebSocket 的广播接收器/源连接时,将其附加(这来自聊天示例:

} via {
  Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[YourEvent], MergeHub.source[YourEvent]) { (source, sink) =>
    broadcastSource = source
    mergeSink = sink

    source.filter(_.isInstanceOf[NonProjectSpecificEvent])
      .via(generalFlow)
      .runWith(sink)

    NotUsed
  }
}
Run Code Online (Sandbox Code Playgroud)