带Web套接字的Akka HTTP内存泄漏

use*_*088 5 scala akka akka-stream akka-http

我有一个Web服务器,它接受传入的Websocket连接,并在Scala中用akka http实现。但是,我一直在观察应用程序的内存使用量的单调增加。经过长时间的挖掘,我发现每个内部连接都会创建一些内部Akka对象,但是在客户端断开连接后它们不会被清除。特别是此类:akka.stream.impl.fusing.ActorGraphInterpreter。每个连接都会创建一个新的此类对象。我曾经jmap计算对象的数量,下面提供了命令。我不确定在这里是否做错了什么。任何建议将不胜感激。

我有一个超级简单的echo websocket服务器来复制此观察结果:

package samples

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.io.StdIn

object AkkaWsExample {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  private val greeterWebSocketService = {
    Flow[Message]
      .collect {
        case tm: TextMessage =>
          println(s"Received $tm")
          TextMessage(Source.single("Hello ") ++ tm.textStream)
      }
  }

  def main(args: Array[String]): Unit = {

    //#websocket-routing
    val route =
      path("greeter") {
        get {
          handleWebSocketMessages(greeterWebSocketService)
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // for the future transformations
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}
Run Code Online (Sandbox Code Playgroud)

然后,我使用任何方法连接到该服务器并断开连接,并运行jmap来计算对象计数,并注意到每个连接严格存在1个新对象。我也尝试了成千上万的连接,同样的事情发生了。

我使用以下命令来计算对象的数量:

jmap -histo:live [pid] | grep ActorGraphInterpreter

这是启动时以及打开和关闭1000个连接后的结果

ip-192-168-30-10:〜liuh $ jps | grep Akka | awk'{print $ 1}'| xargs jmap -histo:live | grep ActorGraphInt | 头-n1

701: 1 56 akka.stream.impl.fusing.ActorGraphInterpreter

ip-192-168-30-10:〜liuh $ jps | grep Akka | awk'{print $ 1}'| xargs jmap -histo:live | grep ActorGraphInt | 头-n1

119: 1001 56056 akka.stream.impl.fusing.ActorGraphInterpreter

您可以看到,对象数量严格增加了连接数。我确保我的客户端已断开连接-我关闭了进程,并验证netstat连接是否已关闭。

Ser*_*gGr 0

您可能没有考虑到 Scala 基于 JVM,而 JVM 使用垃圾收集器,而垃圾收集器又不是确定性的。特别是如果您没有产生足够的内存压力(与允许的内存限制相比),GC 可能根本不会运行。您可以通过强制 GC 轻松验证这个理论(这在生产中很可能很糟糕,但可以调试)。尝试在方法的开头添加main以下代码:

new Thread() {
  override def run(): Unit = {
    println("Start GC-thread")
    val start = System.currentTimeMillis()
    while (true) {
      Thread.sleep(1000)
      System.gc()
    }
  }
}.start()
Run Code Online (Sandbox Code Playgroud)

这段代码启动一个独立的线程,要求VM每秒进行一次GC。我敢打赌,使用这样的代码,您的测试将不会显示超过几个活动ActorGraphInterpreter对象。至少这是我在你的例子中看到的。如果您ActorGraphInterpreter在实际代码中仍然看到很多,则您的示例可能是一个不充分的MCVE,您应该发布一个更好的示例。