如何通过 PlayFramework 中的 Websockets 将数据从 Kafka 流发送到客户端?

dca*_*234 4 scala websocket playframework apache-kafka kafka-consumer-api

我将 Playframework 与 Scala 一起使用。我正在尝试使用来自 Kafka 的数据,处理数据,然后通过 websockets 将数据推送到客户端。不幸的是,我对这项技术仍然是一个新手。

在查看文档时,他们提到当您想要创建 Websocket 时要创建一个 Actor。下面的代码来自网站。网络套接字

import play.api.libs.json.JsValue
import play.api.mvc._
import play.api.libs.streams._

class Controller4 @Inject() (implicit system: ActorSystem, materializer: Materializer) {
  import akka.actor._

  class MyWebSocketActor(out: ActorRef) extends Actor {
     import play.api.libs.json.JsValue
     def receive = {
       case msg: JsValue =>
        out ! msg
     }

     // do i include my kafka consumer here???

  }

  object MyWebSocketActor {
    def props(out: ActorRef) = Props(new MyWebSocketActor(out))
  }

  def socket = WebSocket.accept[JsValue, JsValue] { request =>
    ActorFlow.actorRef(out => MyWebSocketActor.props(out))
  }
}
Run Code Online (Sandbox Code Playgroud)

我的问题是,我应该把 Kafka 消费者代码放在哪里。我要把它放在演员体内吗?这是最佳实践吗?我担心将它放在 kafka 消费者那里的原因是它会阻塞。

预先感谢您的帮助。

Jam*_*ard 5

Akka Streams + Reactive Kafka实际上使这变得非常简单。只需将 Kafka Source 连接到 WebSocket Source,即可通过 WebSocket 将 Kafka 消息发送到客户端。这是带有代码的完整演示: https://www.jamesward.com/2016/05/25/combining-reactive-streams-heroku-kafka-and-play-framework/