Nor*_*rwæ 2 scala connection-pooling akka-http
我正在尝试为我们的akka http应用程序使用客户端连接池。但是,一旦达到最大连接数,请求似乎就会挂起。我将问题归结为以下几点:
import java.lang.Thread.UncaughtExceptionHandler
import java.net.ServerSocket
import akka.actor.ActorSystem
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.http.scaladsl.client.RequestBuilding._
import scala.annotation.tailrec
import scala.util.{Success, Try}
object AkkaProblem extends App {
val server = new ServerSocket(0)
val serverPort = server.getLocalPort
object responder extends Runnable with UncaughtExceptionHandler {
val cr = '\r'
val httpResponse =
s"""HTTP/1.1 404 Not Found$cr
|Content-Type: application/json;charset=UTF-8$cr
|Date: Mon, 26 Sep 2016 06:30:13 GMT$cr
|Connection: keep-alive$cr
|Transfer-Encoding: chunked$cr
|$cr
|12$cr
|{"Hello": "World"}$cr
|0$cr
|$cr
|""".stripMargin
override final def run(): Unit = {
val socket = server.accept()
@tailrec def sendResponse(): Unit = {
socket.getOutputStream.write(httpResponse.getBytes)
sendResponse()
}
sendResponse()
}
override def uncaughtException(t: Thread, e: Throwable): Unit = ()
}
for (nr <- 1 to 4) {
val thread = new Thread(responder, s"response-thread-$nr")
thread.setUncaughtExceptionHandler(responder)
thread.setDaemon(true)
thread.start()
}
implicit val system = ActorSystem("main")
import system.dispatcher
implicit val mat = ActorMaterializer()
val serverUri = Uri(s"http://localhost:$serverPort")
val request = Get(serverUri)
val poolFlow: Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), Http.HostConnectionPool] =
Http().newHostConnectionPool(serverUri.authority.host.address, serverUri.authority.port, ConnectionPoolSettings("max-connections: 4"))
val source = Source.repeat(request).take(1000).map((_, ()))
val runRequest = source.viaMat(poolFlow)(Keep.right).toMat(Sink.seq)(Keep.both)
val (connectionPool, response) = runRequest.run()
response.map(_.map(_._1)).andThen {
case Success(responses) =>
val byResultType = responses.groupBy(_.isSuccess).mapValues(_.size)
println(s"Received response. Got ${byResultType.get(true)} successes, ${byResultType.get(false)} errors")
connectionPool.shutdown() andThen {
case done =>
println("Connection pool shut down")
system.terminate()
}
}
}
Run Code Online (Sandbox Code Playgroud)
我希望该程序能够相对迅速地报告1000次成功并关闭。而是,它无限期地挂起。当请求数减少到与允许的连接数相匹配时,问题便自行解决。
作为一种解决方法,我们可以为每个连接使用一个自己的池,但是这样做根本就没有使用池的目的。
堆栈转储没有显示死锁或其他明显的错误行为:
Java HotSpot(TM)64位服务器VM警告:忽略选项MaxPermSize = 1000m;在8.0中删除了支持
2016-09-26 13:24:18
全线程转储Java HotSpot(TM)64位服务器VM(25.60-b23混合模式):
“附加侦听器”#25守护程序prio = 9 os_prio = 31 tid = 0x00007f86bf001000 nid = 0x3307等待条件[0x0000000000000000]
java.lang.Thread.State:RUNNABLE
“ main-akka.actor.default-dispatcher-10”#24 prio = 5 os_prio = 31 tid = 0x00007f86bbb4a000 nid = 0x6b03等待条件[0x000000011d717000]
java.lang.Thread.State:等待(停车)
在sun.misc.Unsafe.park(本机方法)
-等待停车(akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinPool)
在scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
“ main-akka.actor.default-dispatcher-9”#23 prio = 5 os_prio = 31 tid = 0x00007f86bc402800 nid = 0x6903等待条件[0x000000011d614000]
java.lang.Thread.State:等待(停车)
在sun.misc.Unsafe.park(本机方法)
-等待停车(akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinPool)
在scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
“ main-akka.actor.default-dispatcher-8”#22 prio = 5 os_prio = 31 tid = 0x00007f86bbb49800 nid = 0x6703等待条件[0x000000011d511000]
java.lang.Thread.State:等待(停车)
在sun.misc.Unsafe.park(本机方法)
-等待停车(akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinPool)
在scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
“ main-akka.actor.default-dispatcher-7”#21 prio = 5 os_prio = 31 tid = 0x00007f86bb292000 nid = 0x6503等待条件[0x000000011d40e000]
java.lang.Thread.State:等待(停车)
在sun.misc.Unsafe.park(本机方法)
-等待停车(akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinPool)
在scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
“ main-akka.io.pinned-dispatcher-6”#20 prio = 5 os_prio = 31 tid = 0x00007f86bcbcd000 nid = 0x6407可运行[0x000000011d10b000]
java.lang.Thread.State:RUNNABLE
在sun.nio.ch.KQueueArrayWrapper.kevent0(本机方法)
在sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
在sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)
在sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
-锁定(sun.nio.ch.Util $ 2)
-锁定(java.util.Collections $ UnmodifiableSet)
-锁定(一个sun.nio.ch.KQueueSelectorImpl)
在sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
在sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
在akka.io.SelectionHandler $ ChannelRegistryImpl $$ anon $ 3.tryRun(SelectionHandler.scala:115)
在akka.io.SelectionHandler $ ChannelRegistryImpl $ Task.run(SelectionHandler.scala:219)
在akka.io.SelectionHandler $ ChannelRegistryImpl $$ anon $ 3.run(SelectionHandler.scala:148)
在akka.util.SerializedSuspendableExecutionContext.run $ 1(SerializedSuspendableExecutionContext.scala:67)
在akka.util.SerializedSuspendableExecutionContext.run(SerializedSuspendableExecutionContext.scala:71)
在akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)
“ DestroyJavaVM”#19 prio = 5 os_prio = 31 tid = 0x00007f86bcba2800 nid = 0xd03等待条件[0x0000000000000000]
java.lang.Thread.State:RUNNABLE
“ main-akka.actor.default-dispatcher-5”#18 prio = 5 os_prio = 31 tid = 0x00007f86bc252800 nid = 0x5d03等待条件[0x000000011c488000]
java.lang.Thread.State:等待(停车)
在sun.misc.Unsafe.park(本机方法)
-等待停车(akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinPool)
在scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
“ main-akka.actor.default-dispatcher-4”#17 prio = 5 os_prio = 31 tid = 0x00007f86bc823800 nid = 0x5b03等待条件[0x000000011c185000]
java.lang.Thread.State:等待(停车)
在sun.misc.Unsafe.park(本机方法)
-等待停车(akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinPool)
在scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
“ main-akka.actor.default-dispatcher-3”#16 prio = 5 os_prio = 31 tid = 0x00007f86bba26000 nid = 0x5903等待条件[0x000000011c082000]
java.lang.Thread.State:TIMED_WAITING(停车)
在sun.misc.Unsafe.park(本机方法)
-等待停车(akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinPool)
在scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
在scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
“ main-akka.actor.default-dispatcher-2”#15 prio = 5 os_prio = 31 tid = 0x00007f86bc256000 nid = 0x5703等待条件[0x000000011bf7f000]
java.lang.Thread.State:等待(停车)
在sun.misc.Unsafe.park(本机方法)
-等待停车(akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinPool)
在scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
“ main-scheduler-1”#14 prio = 5 os_prio = 31 tid = 0x00007f86bc248800 nid = 0x5503等待条件[0x000000011b8d2000]
java.lang.Thread.State:TIMED_WAITING(正在休眠)
在java.lang.Thread.sleep(本机方法)
在akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:87)
在akka.actor.LightArrayRevolverScheduler $$ anon $ 4.nextTick(LightArrayRevolverScheduler.scala:268)
在akka.actor.LightArrayRevolverScheduler $$ anon $ 4.run(LightArrayRevolverScheduler.scala:238)
在java.lang.Thread.run(Thread.java:745)
“响应线程4”#13守护程序prio = 5 os_prio = 31 tid = 0x00007f86bc195000 nid = 0x5303可运行[0x000000011b7cf000]
java.lang.Thread.State:RUNNABLE
在java.net.SocketOutputStream.socketWrite0(本机方法)
在java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
在java.net.SocketOutputStream.write(SocketOutputStream.java:141)
在de.fashionid.gatekeeper.AkkaBugReport $ responder $ .sendResponse $ 1(AkkaBugReport.scala:41)
在de.fashionid.gatekeeper.AkkaBugReport $ responder $ .run(AkkaBugReport.scala:45)
在java.lang.Thread.run(Thread.java:745)
“ response-thread-3”#12守护程序prio = 5 os_prio = 31 tid = 0x00007f86bb0fa800 nid = 0x5103可运行[0x000000011b6cc000]
java.lang.Thread.State:RUNNABLE
在java.net.SocketOutputStream.socketWrite0(本机方法)
在java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
在java.net.SocketOutputStream.write(SocketOutputStream.java:141)
在de.fashionid.gatekeeper.AkkaBugReport $ responder $ .sendResponse $ 1(AkkaBugReport.scala:41)
在de.fashionid.gatekeeper.AkkaBugReport $ responder $ .run(AkkaBugReport.scala:45)
在java.lang.Thread.run(Thread.java:745)
“响应线程2”#11守护程序prio = 5 os_prio = 31 tid = 0x00007f86bb9ca000 nid = 0x4f03可运行[0x000000011b5c9000]
java.lang.Thread.State:RUNNABLE
在java.net.SocketOutputStream.socketWrite0(本机方法)
在java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
在java.net.SocketOutputStream.write(SocketOutputStream.java:141)
在de.fashionid.gatekeeper.AkkaBugReport $ responder $ .sendResponse $ 1(AkkaBugReport.scala:41)
在de.fashionid.gatekeeper.AkkaBugReport $ responder $ .run(AkkaBugReport.scala:45)
在java.lang.Thread.run(Thread.java:745)
“响应线程-1”#10守护程序prio = 5 os_prio = 31 tid = 0x00007f86bb9c1000 nid = 0x4d03可运行[0x000000011b4c6000]
java.lang.Thread.State:RUNNABLE
在java.net.SocketOutputStream.socketWrite0(本机方法)
在java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
在java.net.SocketOutputStream.write(SocketOutputStream.java:141)
在de.fashionid.gatekeeper.AkkaBugReport $ responder $ .sendResponse $ 1(AkkaBugReport.scala:41)
在de.fashionid.gatekeeper.AkkaBugReport $ responder $ .run(AkkaBugReport.scala:45)
在java.lang.Thread.run(Thread.java:745)
“监控器Ctrl-Break”#9守护程序prio = 5 os_prio = 31 tid = 0x00007f86bb078000 nid = 0x4b03可运行[0x000000011b101000]
java.lang.Thread.State:RUNNABLE
在java.net.PlainSocketImpl.socketAccept(本机方法)
在java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
在java.net.ServerSocket.implAccept(ServerSocket.java:545)
在java.net.ServerSocket.accept(ServerSocket.java:513)
在com.intellij.rt.execution.application.AppMain $ 1.run(AppMain.java:79)
在java.lang.Thread.run(Thread.java:745)
“服务线程”#8守护程序prio = 9 os_prio = 31 tid = 0x00007f86bc810800 nid = 0x4703可运行[0x0000000000000000]
java.lang.Thread.State:RUNNABLE
“ C1 CompilerThread2”#7守护程序prio = 9 os_prio = 31 tid = 0x00007f86bc805800 nid = 0x4503等待条件[0x0000000000000000]
java.lang.Thread.State:RUNNABLE
“ C2 CompilerThread1”#6守护程序prio = 9 os_prio = 31 tid = 0x00007f86bb866000 nid = 0x4303等待条件[0x0000000000000000]
java.lang.Thread.State:RUNNABLE
“ C2 CompilerThread0”#5守护程序prio = 9 os_prio = 31 tid = 0x00007f86bb833800 nid = 0x4103等待条件[0x0000000000000000]
java.lang.Thread.State:RUNNABLE
“信号调度程序”#4守护程序prio = 9 os_prio = 31 tid = 0x00007f86bb844000 nid = 0x3e23可运行[0x0000000000000000]
java.lang.Thread.State:RUNNABLE
Object.wait()中的“最终化器”#3守护程序prio = 8 os_prio = 31 tid = 0x00007f86bc001000 nid = 0x2b03 [0x0000000118c24000]
java.lang.Thread.State:等待(在对象监视器上)
在java.lang.Object.wait(本地方法)
在java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
-锁定(java.lang.ref.ReferenceQueue $ Lock)
在java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
在java.lang.ref.Finalizer $ FinalizerThread.run(Finalizer.java:209)
Object.wait()中的“引用处理程序”#2守护程序prio = 10 os_prio = 31 tid = 0x00007f86bb81f000 nid = 0x2903 [0x0000000118b21000]
java.lang.Thread.State:等待(在对象监视器上)
在java.lang.Object.wait(本地方法)
在java.lang.Object.wait(Object.java:502)
在java.lang.ref.Reference $ ReferenceHandler.run(Reference.java:157)
-锁定(java.lang.ref.Reference $ Lock)
“ VM线程” os_prio = 31 tid = 0x00007f86bb014800 nid = 0x2703可运行
“ GC任务线程#0(ParallelGC)” os_prio = 31 tid = 0x00007f86bc005000 nid = 0x1f03可运行
“ GC任务线程#1(ParallelGC)” os_prio = 31 tid = 0x00007f86bc005800 nid = 0x2103可运行
“ GC任务线程#2(ParallelGC)” os_prio = 31 tid = 0x00007f86bc006800 nid = 0x2303可运行
“ GC任务线程#3(ParallelGC)” os_prio = 31 tid = 0x00007f86bc007000 nid = 0x2503可运行
“ VM定期任务线程” os_prio = 31 tid = 0x00007f86bb80c000 nid = 0x4903等待条件
JNI全球参考:250
您需要HttpResponse从请求中显式使用的实体(正文)。因为响应的实体实际上是一个流,所以如果您不使用它,它将使连接保持打开状态。该文档详细介绍了请求响应周期。服务器必须Connection: close在标头中发送,或者必须附加一些符号Sink(例如Sink.ignore)以使用流。
实际上,有几种方法可以处理HttpResponse。一种是调用的toStrict(timeout: FiniteDuration)方法,该方法HttpResponse将获取整个实体并关闭连接。该timeout限制多久HTTP请求将等待发送回应。如果您对实体不感兴趣,也可以discardEntityBytes()在上调用method HttpResponse。最后,您可以通过一些有效的接收器来消耗流,例如Unmarshal(resp.entity).to[SomeClass]。