在akka-http路由中,我可以Future作为隐式转换为的响应返回ToResponseMarshaller.
有没有办法处理这个未来的超时?或路由级别的连接超时?或者一种方法是使用Await()功能?
现在客户端可以永远等待响应.
complete {
val future = for {
response <- someIOFunc()
entity <- someOtherFunc()
} yield entity
future.onComplete({
case Success(result) =>
HttpResponse(entity = HttpEntity(MediaTypes.`text/xml`, result))
case Failure(result) =>
HttpResponse(entity = utils.getFault("fault"))
})
future
}
Run Code Online (Sandbox Code Playgroud) 我有一个指令,定义为
def allowedRoles(roles: UserRole*)(implicit login: Login): Directive0 = ???
Run Code Online (Sandbox Code Playgroud)
但我似乎无法在不必明确传递login参数的情况下使用它
def myRoutes(implicit req: HttpRequest, login: Login) = {
path("example" / "path") {
get {
allowedRoles(Administrator) { // ? fails
handleGet
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果我尝试编译它失败,类型不匹配:
[error] found : akka.http.scaladsl.server.Route
[error] (which expands to) akka.http.scaladsl.server.RequestContext => scala.concurrent.Future[akka.http.scaladsl.server.RouteResult]
[error] required: com.example.Login
[error] allowedRoles(Administrator) { handleGet } }
Run Code Online (Sandbox Code Playgroud)
如果我将标记线更改为allowedRoles(Administrator)(login)然后它可以工作,但似乎我不应该这样做,我错过了什么?
我正在研究akka http
"com.typesafe.akka" %% "akka-actor" % "2.4.6",
"com.typesafe.akka" % "akka-http-experimental_2.11" % "2.4.6"
Run Code Online (Sandbox Code Playgroud)
我正在测试简单的代码,如下所示.我的主要问题是如何增强它以获得关闭连接的通知,那么我可以打印当前打开的连接数?
object StatsRepo{
val totConn = new AtomicInteger(0)
val currOpenConn = new AtomicInteger(0) // how to count this?
}
object Boot2 extends App{
implicit val system = ActorSystem("akka-http")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val requestHandler: HttpRequest => Future[HttpResponse] = {
// do some work here...
}
val serverSource = Http().bind("0.0.0.0", 8080)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection =>
StatsRepo.totConn.incrementAndGet()
connection handleWithAsyncHandler requestHandler …Run Code Online (Sandbox Code Playgroud) 由于文档尚未准备好,我将在这里询问 akka 维护人员。
为什么 akka-httpUnmarshaler返回Future[T]而不是T?这是我的目标。我想从 XML http 响应中解组类,类似于对 json 的处理方式。例如我想写
Unmarshal(HttpResponse.entity).to[Person]
Run Code Online (Sandbox Code Playgroud)
其中案例类及其解组器看起来像这样
case class Person(name: String, age: Int)
implicit val personUnmarshaller = Unmarshaller[NodeSeq, Person] { _ => xml =>
Future(Person((xml \\ "name").text, (xml \\ "age").text.toInt))
}
Run Code Online (Sandbox Code Playgroud)
它不会ScalaXmlSupport使用 1.0-RC4 提供的版本进行编译,因为Unmarshaller[ResponseEntity,Person]在范围内不可用。所以为了欺骗它我写了两个隐式转换
implicit def xmlUnmarshallerConverter[T](marsh: Unmarshaller[NodeSeq, T])(implicit mat: Materializer): FromEntityUnmarshaller[T] =
xmlUnmarshaller(marsh, mat)
implicit def xmlUnmarshaller[T](implicit marsh: Unmarshaller[NodeSeq, T], mat: Materializer): FromEntityUnmarshaller[T] =
defaultNodeSeqUnmarshaller.map(Unmarshal(_).to[T].value.get.get)
Run Code Online (Sandbox Code Playgroud)
它有效,但我不喜欢丑陋的.value.get.get。有没有更优雅的方法来实现这个?
调用singleRequest时,如何自定义连接池使用的执行上下文?我简要地看了一下代码,并且调用singleRequest导致一条消息被发送到PoolMasterActor,后者又向池接口actor发送消息.
我正在尝试使用Play和akka流创建一个简单的Websocket连接代理.交通流量是这样的:
(Client) request -> -> request (Server)
Proxy
(Client) response <- <- response (Server)
Run Code Online (Sandbox Code Playgroud)
我按照一些例子后提出了以下代码:
def socket = WebSocket.accept[String, String] { request =>
val uuid = UUID.randomUUID().toString
// wsOut - actor that deals with incoming websocket frame from the Client
// wsIn - publisher of the frame for the Server
val (wsOut: ActorRef, wsIn: Publisher[String]) = {
val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail)
val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false)
source.toMat(sink)(Keep.both).run()
}
// sink that deals with the …Run Code Online (Sandbox Code Playgroud) 我正在使用带有队列的主机级API。
private val (queueSource, connectionPool) = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure).async
.viaMat(poolFlow)(Keep.both)
.toMat(
Sink.foreach({
case ((Success(resp), p)) =>
p.success(resp)
case ((Failure(e), p)) => p.failure(e)
})
)(Keep.left)
.run()
Run Code Online (Sandbox Code Playgroud)
我有很多请求在连接池中争夺连接,但出现以下错误:
java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request
at akka.stream.impl.QueueSource$$anon$1.akka$stream$impl$QueueSource$$anon$$bufferElem(QueueSource.scala:84)
at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:94)
at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:91)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:464)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:741)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:756)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at …Run Code Online (Sandbox Code Playgroud) 我有一个Web服务器,它接受传入的Websocket连接,并在Scala中用akka http实现。但是,我一直在观察应用程序的内存使用量的单调增加。经过长时间的挖掘,我发现每个内部连接都会创建一些内部Akka对象,但是在客户端断开连接后它们不会被清除。特别是此类:akka.stream.impl.fusing.ActorGraphInterpreter。每个连接都会创建一个新的此类对象。我曾经jmap计算对象的数量,下面提供了命令。我不确定在这里是否做错了什么。任何建议将不胜感激。
我有一个超级简单的echo websocket服务器来复制此观察结果:
package samples
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.io.StdIn
object AkkaWsExample {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
private val greeterWebSocketService = {
Flow[Message]
.collect {
case tm: TextMessage =>
println(s"Received $tm")
TextMessage(Source.single("Hello ") ++ tm.textStream)
}
}
def main(args: Array[String]): Unit = {
//#websocket-routing
val route =
path("greeter") {
get {
handleWebSocketMessages(greeterWebSocketService)
}
}
val bindingFuture …Run Code Online (Sandbox Code Playgroud) Is there a common approach to handle PATCH requests in REST API using circe library? By default, circe does not allow decoding partial JSON with only a part of the fields specified, i.e. it requires all fields to be set. You could use a withDefaults config, but it will be impossible to know if the field you received is null or just not specified. Here is a simplified sample of the possible solution. It uses Left[Unit] as a value to …
我刚开始学习 Scala、Akka Streams 和 Akka HTTP,所以如果问题太基本,请提前道歉。
我想在 HTTP 请求中执行一个 HTTP 请求,就像在下面的一段代码中一样:
implicit val system = ActorSystem("ActorSystem")
implicit val materializer = ActorMaterializer
import system.dispatcher
val requestHandler: Flow[HttpRequest, HttpResponse, _] = Flow[HttpRequest].map {
case HttpRequest(HttpMethods.GET, Uri.Path("/api"), _, _, _) =>
val responseFuture = Http().singleRequest(HttpRequest(uri = "http://www.google.com"))
responseFuture.onComplete {
case Success(response) =>
response.discardEntityBytes()
println(s"The request was successful")
case Failure(ex) =>
println(s"The request failed with: $ex")
}
//Await.result(responseFuture, 10 seconds)
println("Reached HttpResponse")
HttpResponse(
StatusCodes.OK
)
}
Http().bindAndHandle(requestHandler, "localhost", 8080)
Run Code Online (Sandbox Code Playgroud)
但在上述情况下,结果如下所示,这意味着Reached HttpResponse在完成请求之前首先到达:
Reached …Run Code Online (Sandbox Code Playgroud) akka-http ×10
scala ×9
akka ×5
akka-stream ×4
circe ×1
httprequest ×1
httpresponse ×1
spray ×1
spray-client ×1