如何在websockets中始终保持连接打开状态

inv*_*ant 4 scala websocket akka-http

服务器代码:

object EchoService {

  def route: Route = path("ws-echo") {
    get {
      handleWebSocketMessages(flow)
    }
  } ~ path("send-client") {
    get {
      sourceQueue.map(q => {
        println(s"Offering message from server")
        q.offer(BinaryMessage(ByteString("ta ta")))
      } )
      complete("Sent from server successfully")
    }
  }

  val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](100, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    })
    (s, p.future)
  }

  val flow =
    Flow.fromSinkAndSourceMat(Sink.ignore, source)(Keep.right)
}
Run Code Online (Sandbox Code Playgroud)

客户代码:

object Client extends App {


  implicit val actorSystem = ActorSystem("akka-system")
  implicit val flowMaterializer = ActorMaterializer()

  val config = actorSystem.settings.config
  val interface = config.getString("app.interface")

  val port = config.getInt("app.port")


  // print each incoming strict text message
  val printSink: Sink[Message, Future[Done]] =
    Sink.foreach {
      case message: TextMessage.Strict =>
        println(message.text)

      case _ => println(s"received unknown message format")
    }

  val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](100, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    })
    (s, p.future)
  }

  val flow =
    Flow.fromSinkAndSourceMat(printSink, source)(Keep.right)


  val (upgradeResponse, sourceClosed) =
    Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080/ws-echo"), flow)

  val connected = upgradeResponse.map { upgrade =>
    // just like a regular http request we can get 404 NotFound,
    // with a response body, that will be available from upgrade.response
    if (upgrade.response.status == StatusCodes.SwitchingProtocols || upgrade.response.status == StatusCodes.OK ) {
      Done
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
    }
  }

  connected.onComplete(println)

}
Run Code Online (Sandbox Code Playgroud)

当我点击http://localhost:8080/send-client我看到消息来到客户端但过了一段时间如果尝试再次发送给客户端我没有在客户端看到任何消息:s.我也试过source.concatMat(Source.maybe)(Keep.right)但没有运气:(

编辑:我用js客户端测试,不知何故连接/流量在服务器端关闭,反正有没有阻止这个?以及如何在使用akka-http websocket客户端时监听此事件:s

inv*_*ant 5

嗨,

它之所以没有保持连接,是因为默认情况下,所有HTTP连接都默认启用idle-timeout,以防止系统在没有任何信号的情况下消失而导致系统泄漏.

克服此限制(实际上是我推荐的方法)的一种方法是在客户端注入保持活动消息(服务器否则忽略的消息,但通知底层HTTP服务器连接仍然存在).

您可以将HTTP服务器配置中的空闲超时覆盖为更大的值,但我不建议这样做.

如果您使用的是基于流的客户端,则在必要时注入心跳就像调用keepAlive并为其提供时间间隔和工厂一样简单,您需要注入的消息:http: //doc.akka.io/api/akka/2.4. 7/index.html#akka.stream.scaladsl.Flow@keepAlive U>:Out:FlowOps.this.Repr [U]

组合器将确保没有超过T的句点将保持沉默,因为它将注入元素以在必要时保留此合同(并且如果有足够的后台流量则不会注入任何内容)

-Endre

谢谢你Endre :),工作片段..

// on client side 

 val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    }).keepAlive(FiniteDuration(1, TimeUnit.SECONDS), () => TextMessage.Strict("Heart Beat"))
    (s, p.future)
  }
Run Code Online (Sandbox Code Playgroud)