Kafka给websocket留言

vis*_*shr 6 websocket akka-stream akka-http

我正在尝试使用reactive-kafka,akka-http和akka-stream将一个Kafka使用者写入websocket流.

  val publisherActor = actorSystem.actorOf(CommandPublisher.props)
  val publisher = ActorPublisher[String](publisherActor)
  val commandSource = Source.fromPublisher(publisher) map toMessage
  def toMessage(c: String): Message = TextMessage.Strict(c)

  class CommandPublisher extends ActorPublisher[String] {
    override def receive = {
      case cmd: String =>
        if (isActive && totalDemand > 0)
          onNext(cmd)
    }
  }

  object CommandPublisher {
    def props: Props = Props(new CommandPublisher())
  }

  // This is the route 
  def mainFlow(): Route = {
    path("ws" / "commands" ) {
       handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
    } 
  }
Run Code Online (Sandbox Code Playgroud)

从kafka使用者(这里省略),我做了一个publisherActor ! commandString动态添加内容到websocket.

但是,当我启动多个客户端到websocket时,我在后端遇到此异常:

[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
  at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
  at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
  ...
Run Code Online (Sandbox Code Playgroud)

所有websocket客户端都不能使用一个流程吗?或者应该为每个客户创建流/发布者actor?

在这里,我打算向所有websocket客户端发送"当前"/"实时"通知.通知历史无关紧要,新客户需要忽略.

tac*_*cos 4

很抱歉告诉您一个坏消息,但看起来这是akka关于 的显式设计。您无法根据需要为所有客户端重用流实例。由于 Rx 模型,扇出必须是“显式的”。

我遇到的使用特定路线的示例Flow

  // The flow from beginning to end to be passed into handleWebsocketMessages
  def websocketDispatchFlow(sender: String): Flow[Message, Message, Unit] =
    Flow[Message]
      // First we convert the TextMessage to a ReceivedMessage
      .collect { case TextMessage.Strict(msg) => ReceivedMessage(sender, msg) }
      // Then we send the message to the dispatch actor which fans it out
      .via(dispatchActorFlow(sender))
      // The message is converted back to a TextMessage for serialization across the socket
      .map { case ReceivedMessage(from, msg) => TextMessage.Strict(s"$from: $msg") }

  def route =
    (get & path("chat") & parameter('name)) { name =>
      handleWebsocketMessages(websocketDispatchFlow(sender = name))
    }
Run Code Online (Sandbox Code Playgroud)

这是关于它的讨论:

这正是我在 Akka Stream 中不喜欢的,这种明确的扇出。当我从我想要处理的某个地方收到一个数据源(例如 Observable 或 Source)时,我只想订阅它,而不想关心它是冷还是热,或者是否已被其他订阅者订阅或不。这是我对河流的比喻。河流不应该关心谁喝它,饮用者也不应该关心河流的源头或有多少其他饮用者。我的示例相当于 Mathias 提供的示例,确实共享数据源,但它只是进行引用计数,您可以有 2 个订阅者,也可以有 100 个订阅者,这并不重要。在这里,我很感兴趣,但是如果您不想丢失事件或者想确保流保持始终在线,则引用计数不起作用。但随后您使用了ConnectableObservable which has connect(): Cancelable,这非常适合……Play 的 LifeCycle 插件。如果您想为新订阅者重复以前的值,您可以使用BehaviorSubject 或ReplaySubject。之后一切就正常了,不需要手动绘制连接图。... ...(来自https://bionicspirit.com/blog/2015/09/06/monifu-vs-akka-streams.html)...对于接受 Observable 并返回 Observable 的函数,我们确实有 lift,它是最接近有名称的东西,并且可以在 Monifu forSubject或其他 Observable 类型中发挥巨大作用,因为 LiftOperators1(和 2),这使得在不丢失的情况下转换 Observables 成为可能它们的类型 - 这是对 RxJava 对lift.

但是,此类函数并不等同于Processor/ Subject。不同的是,它Subject同时是消费者和生产者。这意味着订阅者无法准确控制数据源启动的时间,并且数据源本质上是 热的(意味着多个订阅者共享相同的数据源)。在 Rx 中,如果您对冷可观察量(即为每个单独订阅者启动新数据源的可观察量)建模,那就完全没问题。另一方面,在 Rx 中(一般来说),数据源只能订阅一次是不行的,仅此而已。Monifu 中此规则的唯一例外是 GroupBy 运算符生成的 Observables,但这就像确认该规则的例外。

这意味着,特别是与 Monifu 和 Reactive Streams 协议的合同的另一个限制(您不得向同一消费者多次订阅)相结合,一个 Subject或一个Processor实例是不可重用的。为了使这样的实例可重用,Rx 模型需要一个 Processor. 此外,这意味着每当您想使用 Subject/时Processor,您的数据源必须自动成为热数据源 (可在多个订阅者之间共享)。