标签: akka-http

如何处理响应超时?

在akka-http路由中,我可以Future作为隐式转换为的响应返回ToResponseMarshaller.

有没有办法处理这个未来的超时?或路由级别的连接超时?或者一种方法是使用Await()功能?

现在客户端可以永远等待响应.

complete {
   val future = for {
     response <- someIOFunc()
     entity <- someOtherFunc()
   } yield entity
   future.onComplete({
     case Success(result) =>
       HttpResponse(entity = HttpEntity(MediaTypes.`text/xml`, result))
     case Failure(result) =>
       HttpResponse(entity = utils.getFault("fault"))
   })
   future
 }
Run Code Online (Sandbox Code Playgroud)

scala akka-http

6
推荐指数
1
解决办法
2461
查看次数

在定义路由指令时是否可以使用隐式参数?

我有一个指令,定义为

def allowedRoles(roles: UserRole*)(implicit login: Login): Directive0 = ???
Run Code Online (Sandbox Code Playgroud)

但我似乎无法在不必明确传递login参数的情况下使用它

def myRoutes(implicit req: HttpRequest, login: Login) = {
  path("example" / "path") {
    get {
      allowedRoles(Administrator) { // ? fails 
        handleGet
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

如果我尝试编译它失败,类型不匹配:

[error]  found   : akka.http.scaladsl.server.Route
[error]     (which expands to)  akka.http.scaladsl.server.RequestContext => scala.concurrent.Future[akka.http.scaladsl.server.RouteResult]
[error]  required: com.example.Login
[error]         allowedRoles(Administrator) { handleGet } }
Run Code Online (Sandbox Code Playgroud)

如果我将标记线更改为allowedRoles(Administrator)(login)然后它可以工作,但似乎我不应该这样做,我错过了什么?

scala akka-http

6
推荐指数
1
解决办法
644
查看次数

我如何计算akka当前的开放连接数

我正在研究akka http

  "com.typesafe.akka" %% "akka-actor" % "2.4.6",
  "com.typesafe.akka" % "akka-http-experimental_2.11" % "2.4.6"
Run Code Online (Sandbox Code Playgroud)

我正在测试简单的代码,如下所示.我的主要问题是如何增强它以获得关闭连接的通知,那么我可以打印当前打开的连接数?

object StatsRepo{
 val totConn = new AtomicInteger(0)
 val currOpenConn = new AtomicInteger(0) // how to count this?
}

object Boot2 extends App{
   implicit val system = ActorSystem("akka-http")
   implicit val materializer = ActorMaterializer()
   implicit val executionContext = system.dispatcher

   val requestHandler: HttpRequest => Future[HttpResponse] = {
       // do some work here...
   }
   val serverSource = Http().bind("0.0.0.0", 8080)


  val bindingFuture: Future[Http.ServerBinding] =
    serverSource.to(Sink.foreach { connection =>
      StatsRepo.totConn.incrementAndGet()
     connection handleWithAsyncHandler requestHandler …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-http

6
推荐指数
1
解决办法
366
查看次数

为什么 akka-http Unmarshaler 返回 Future[T] 而不是 T?

由于文档尚未准备好,我将在这里询问 akka 维护人员。

为什么 akka-httpUnmarshaler返回Future[T]而不是T?这是我的目标。我想从 XML http 响应中解组类,类似于对 json 的处理方式。例如我想写

Unmarshal(HttpResponse.entity).to[Person]
Run Code Online (Sandbox Code Playgroud)

其中案例类及其解组器看起来像这样

case class Person(name: String, age: Int)

implicit val personUnmarshaller = Unmarshaller[NodeSeq, Person] { _ => xml =>
      Future(Person((xml \\ "name").text, (xml \\ "age").text.toInt))
}
Run Code Online (Sandbox Code Playgroud)

它不会ScalaXmlSupport使用 1.0-RC4 提供的版本进行编译,因为Unmarshaller[ResponseEntity,Person]在范围内不可用。所以为了欺骗它我写了两个隐式转换

implicit def xmlUnmarshallerConverter[T](marsh: Unmarshaller[NodeSeq, T])(implicit mat: Materializer): FromEntityUnmarshaller[T] =
  xmlUnmarshaller(marsh, mat)

implicit def xmlUnmarshaller[T](implicit marsh: Unmarshaller[NodeSeq, T], mat: Materializer): FromEntityUnmarshaller[T] =
  defaultNodeSeqUnmarshaller.map(Unmarshal(_).to[T].value.get.get)
Run Code Online (Sandbox Code Playgroud)

它有效,但我不喜欢丑陋的.value.get.get。有没有更优雅的方法来实现这个?

scala akka spray spray-client akka-http

5
推荐指数
1
解决办法
1517
查看次数

如何自定义Akka Http Client执行上下文

调用singleRequest时,如何自定义连接池使用的执行上下文?我简要地看了一下代码,并且调用singleRequest导致一条消息被发送到PoolMasterActor,后者又向池接口actor发送消息.

  1. 每个连接是阻塞还是非阻塞?
  2. 哪个上下文用于连接池?(我想确保我的HTTP请求不会阻止所有线程)

akka-http

5
推荐指数
1
解决办法
601
查看次数

使用Play 2.6和akka流的Websocket代理

我正在尝试使用Play和akka流创建一个简单的Websocket连接代理.交通流量是这样的:

(Client) request  ->         -> request (Server)
                      Proxy 
(Client) response <-         <- response (Server)
Run Code Online (Sandbox Code Playgroud)

我按照一些例子后提出了以下代码:

def socket = WebSocket.accept[String, String] { request =>

val uuid = UUID.randomUUID().toString

// wsOut - actor that deals with incoming websocket frame from the Client
// wsIn - publisher of the frame for the Server
val (wsOut: ActorRef, wsIn: Publisher[String]) = {
  val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail)
  val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false)
  source.toMat(sink)(Keep.both).run()
}

// sink that deals with the …
Run Code Online (Sandbox Code Playgroud)

scala akka playframework akka-stream akka-http

5
推荐指数
1
解决办法
1077
查看次数

如何启用 Source.Queue 背压

我正在使用带有队列的主机级API

  private val (queueSource, connectionPool) = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure).async
    .viaMat(poolFlow)(Keep.both)
    .toMat(
      Sink.foreach({
        case ((Success(resp), p)) =>
          p.success(resp)
        case ((Failure(e), p)) => p.failure(e)
      })
    )(Keep.left)
    .run()
Run Code Online (Sandbox Code Playgroud)

我有很多请求在连接池中争夺连接,但出现以下错误:

 java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request
    at akka.stream.impl.QueueSource$$anon$1.akka$stream$impl$QueueSource$$anon$$bufferElem(QueueSource.scala:84)
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:94)
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:91)
    at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:464)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:741)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:756)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
    at akka.actor.ActorCell.invoke(ActorCell.scala:496)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream akka-http

5
推荐指数
1
解决办法
1304
查看次数

带Web套接字的Akka HTTP内存泄漏

我有一个Web服务器,它接受传入的Websocket连接,并在Scala中用akka http实现。但是,我一直在观察应用程序的内存使用量的单调增加。经过长时间的挖掘,我发现每个内部连接都会创建一些内部Akka对象,但是在客户端断开连接后它们不会被清除。特别是此类:akka.stream.impl.fusing.ActorGraphInterpreter。每个连接都会创建一个新的此类对象。我曾经jmap计算对象的数量,下面提供了命令。我不确定在这里是否做错了什么。任何建议将不胜感激。

我有一个超级简单的echo websocket服务器来复制此观察结果:

package samples

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.io.StdIn

object AkkaWsExample {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  private val greeterWebSocketService = {
    Flow[Message]
      .collect {
        case tm: TextMessage =>
          println(s"Received $tm")
          TextMessage(Source.single("Hello ") ++ tm.textStream)
      }
  }

  def main(args: Array[String]): Unit = {

    //#websocket-routing
    val route =
      path("greeter") {
        get {
          handleWebSocketMessages(greeterWebSocketService)
        }
      }

    val bindingFuture …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream akka-http

5
推荐指数
1
解决办法
242
查看次数

Handling PATCH requests with Akka HTTP and circe for nullable fields

Is there a common approach to handle PATCH requests in REST API using circe library? By default, circe does not allow decoding partial JSON with only a part of the fields specified, i.e. it requires all fields to be set. You could use a withDefaults config, but it will be impossible to know if the field you received is null or just not specified. Here is a simplified sample of the possible solution. It uses Left[Unit] as a value to …

scala akka-http circe

5
推荐指数
1
解决办法
420
查看次数

Scala 和 Akka HTTP:请求中的请求和线程问题

我刚开始学习 Scala、Akka Streams 和 Akka HTTP,所以如果问题太基本,请提前道歉。

我想在 HTTP 请求中执行一个 HTTP 请求,就像在下面的一段代码中一样:

  implicit val system = ActorSystem("ActorSystem")
  implicit val materializer = ActorMaterializer
  import system.dispatcher

  val requestHandler: Flow[HttpRequest, HttpResponse, _] = Flow[HttpRequest].map {
    case HttpRequest(HttpMethods.GET, Uri.Path("/api"), _, _, _) =>
      val responseFuture = Http().singleRequest(HttpRequest(uri = "http://www.google.com"))
      responseFuture.onComplete {
        case Success(response) =>
          response.discardEntityBytes()
          println(s"The request was successful")
        case Failure(ex) =>
          println(s"The request failed with: $ex")
      }
      //Await.result(responseFuture, 10 seconds)
      println("Reached HttpResponse")
      HttpResponse(
        StatusCodes.OK
      )
  }

  Http().bindAndHandle(requestHandler, "localhost", 8080)  
Run Code Online (Sandbox Code Playgroud)

但在上述情况下,结果如下所示,这意味着Reached HttpResponse在完成请求之前首先到达:

Reached …
Run Code Online (Sandbox Code Playgroud)

scala httpresponse httprequest akka-stream akka-http

5
推荐指数
1
解决办法
252
查看次数