小编mat*_*oyd的帖子

如何关闭 Akka-stream Tcp 服务器中的连接?

close() 在我们基于 akka-streams 的 Tcp 服务器中的预期行为是客户端应该立即看到 SocketException 并且在异常之后发布的任何消息都不应该传递到服务器。但是,必须传递异常之前发送的最后一条消息。所有新客户端都应该遇到 ConnectException。

无法破译akka-streams 1.0 文档中关于“关闭连接”的段落,我们正在为 Tcp 服务器使用如下实现 close():

  val serverSource: Promise[ByteString] = Promise[ByteString]()

  var serverBindingOption: Option[Tcp.ServerBinding] = None

  def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int)
    (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[SyslogStreamServer] = {
    implicit val executionContext = system.dispatcher
    val sink = Sink.foreach {
      conn: Tcp.IncomingConnection =>
        val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))

        val targetSink = Flow[ByteString]
          .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = MaxFrameLength, allowTruncation = true))
          .map(raw ? Message(raw))
          .to(Sink(targetSubscriber))

        conn.flow.to(targetSink).runWith(Source(serverSource.future).drop(1)) // …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream

5
推荐指数
0
解决办法
1032
查看次数

标签 统计

akka ×1

akka-stream ×1

scala ×1