传出连接流已关闭

alm*_*dar 6 akka-http

我有一个行为的演员:

def receive: Receive = {
    case Info(message) =>
      val res = send("INFO:"  + message)
      installAckHook(res)
    case Warning(message) =>
      val res = send("WARNING:"  + message)
      installAckHook(res)
    case Error(message) =>
      val res = send("ERROR:" + message)
      installAckHook(res)
  }

 private def installAckHook[T](fut: Future[T]): Unit = {
    val answerTo = sender()

    fut.onComplete {
      case Success(_) => answerTo ! "OK"
      case Failure(ex) => answerTo ! ex
    }
  }


  private def send(message: String): Future[HttpResponse] = {
    import context.system
    val payload: Payload = Payload(text = message,
      username = slackConfig.username, icon_url = slackConfig.iconUrl,
      icon_emoji = slackConfig.iconEmoji, channel = slackConfig.channel)
      .validate
    Http().singleRequest(RequestBuilding.Post(slackConfig.hookAddress, payload))
  }
Run Code Online (Sandbox Code Playgroud)

并且测试

val actorRef = system.actorOf(SlackHookActor.props(SlackEndpointConfig(WebHookUrl,iconEmoji = Some(":ghost:"))))
actorRef ! Error("Some error message")
actorRef ! Warning("Some warning message")
actorRef ! Info("Some info message")
receiveN(3)
Run Code Online (Sandbox Code Playgroud)

并且在afterAll()方法中我使用了关闭actor系统TestKit.

它工作,请求到达服务器,但akka流部分有错误:

[ERROR] [06/26/2015 11:34:55.118] [SlackHookTestingSystem-akka.actor.default-dispatcher-10] [ActorSystem(SlackHookTestingSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException)
[ERROR] [06/26/2015 11:34:55.120] [SlackHookTestingSystem-akka.actor.default-dispatcher-13] [ActorSystem(SlackHookTestingSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException)
[ERROR] [06/26/2015 11:34:55.121] [SlackHookTestingSystem-akka.actor.default-dispatcher-8] [ActorSystem(SlackHookTestingSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException)
Run Code Online (Sandbox Code Playgroud)

似乎因为我有一个未来完成传出连接应该已经关闭,所以这是一个错误或我错过了吗?

Som*_*tik 3

您还需要关闭 http 连接池,例如

Http().shutdownAllConnectionPools().onComplete{ _ =>
  system.shutdown()
}
Run Code Online (Sandbox Code Playgroud)

也许 akka http testkit 提供了一些帮助

  • 这就开始产生一个问题:什么应该关闭什么?我们有连接池、参与者系统和参与者实现器。所以它应该像关闭http池->关闭物化->关闭系统? (5认同)