Akka-Http Websockets:如何向消费者发送相同的数据流

Cia*_*an0 4 scala akka akka-stream akka-http

我有一个客户端可以连接的WebSocket我也有一个使用akka-streams的数据流.如何使所有客户端获得相同的数据.目前他们似乎正在争夺数据.

谢谢

小智 9

你可以做的一种方法是让一个actor扩展ActorPublisher并让它订阅一些消息.

class MyPublisher extends ActorPublisher[MyData]{

  override def preStart = {
    context.system.eventStream.subscribe(self, classOf[MyData])
  }

  override def receive: Receive = {

    case msg: MyData ?
      if (isActive && totalDemand > 0) {
        // Pushes the message onto the stream
        onNext(msg)
      }
  }
}

object MyPublisher {
  def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}

case class MyData(data:String)
Run Code Online (Sandbox Code Playgroud)

然后,您可以使用该actor作为流的源:

val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))
Run Code Online (Sandbox Code Playgroud)

然后,您可以从该数据源创建流并应用转换以将数据转换为websocket消息

val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})
Run Code Online (Sandbox Code Playgroud)

然后,您可以在路径处理中使用该流程.

path("readings") {
  handleWebsocketMessages(myFlow)
} 
Run Code Online (Sandbox Code Playgroud)

然后,您可以从原始流的处理中将数据发布到事件流,并且该actor的任何实例都会将其拾取并放入正在为其提供websocket的流中.

  val actorSystem = ActorSystem("foo")

  val otherSource = Source.fromIterator(()  => List(MyData("a"), MyData("b")).iterator)

  otherSource.runForeach { msg ? actorSystem.eventStream.publish(MyData("data"))}
Run Code Online (Sandbox Code Playgroud)

然后,每个套接字都有自己的actor实例,为它提供所有来自单个源的数据.