close() 在我们基于 akka-streams 的 Tcp 服务器中的预期行为是客户端应该立即看到 SocketException 并且在异常之后发布的任何消息都不应该传递到服务器。但是,必须传递异常之前发送的最后一条消息。所有新客户端都应该遇到 ConnectException。
无法破译akka-streams 1.0 文档中关于“关闭连接”的段落,我们正在为 Tcp 服务器使用如下实现 close():
val serverSource: Promise[ByteString] = Promise[ByteString]()
var serverBindingOption: Option[Tcp.ServerBinding] = None
def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int)
(implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[SyslogStreamServer] = {
implicit val executionContext = system.dispatcher
val sink = Sink.foreach {
conn: Tcp.IncomingConnection =>
val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))
val targetSink = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = MaxFrameLength, allowTruncation = true))
.map(raw ? Message(raw))
.to(Sink(targetSubscriber))
conn.flow.to(targetSink).runWith(Source(serverSource.future).drop(1)) // …Run Code Online (Sandbox Code Playgroud)