我在请求发送以下标题到我的阿卡-HTTP API: ,"Content-type": "application/json",."Accept": "application/json""AppId": "some_id"
如何在我的akka-http路由中获得"AppId"自定义标头?
(get & parameters("id")) { (id) =>
complete {
val appId = ?? // I want to get custom header here.
}
}
Run Code Online (Sandbox Code Playgroud)
谢谢.
我有一个客户端可以连接的WebSocket我也有一个使用akka-streams的数据流.如何使所有客户端获得相同的数据.目前他们似乎正在争夺数据.
谢谢
我正在使用akka和akka-http 2.4.2,我正在尝试了解它们的内部组件.
akka和akka-http用什么来启动休息网络服务?
它使用嵌入式Web服务?(像Jetty?)
我如何获得它的版本?
我开始运行其余网络服务的代码是:
implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()
val route: Route = {
blablabla ...
}
val bind = Http().bindAndHandle(route, "0.0.0.0", 8080)
Run Code Online (Sandbox Code Playgroud)
谢谢.
我使用的是akka-http,我的build.sbt配置是:
scalaVersion := "2.11.7"
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.11" % "2.4.2"
libraryDependencies += "com.typesafe.akka" % "akka-http-experimental_2.11" % "2.4.2"
libraryDependencies += "com.typesafe.akka" % "akka-http-spray-json-experimental_2.11" % "2.4.2"
libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.11" % "2.4.2"
Run Code Online (Sandbox Code Playgroud)
我只暴露一个简单的REST api只有一个GET url
foo是一个返回Future的函数
implicit val actorSystem = ActorSystem("system", config)
implicit val actorMaterializer = ActorMaterializer()
val route: Route = {
get {
path("foo") {
complete { foo }
}
}
}
Run Code Online (Sandbox Code Playgroud)
Web服务预计会有很多调用,我想在发生故障时使服务变为冗余,因此我希望同时运行两个实例来处理所有请求.
1)在akka/akka-http中,有两个同时处理请求的Web服务的最佳方法是什么?有外部负载均衡器或者有一些魔法(我不知道)?
2)我需要调整哪些主要参数来改善性能?
我使用akka http客户端2.4.6将json发布到服务器(服务器要求消息的内容类型为applicaton / json才能处理):
val request = HttpRequest(uri = "http://localhost:9000/auth/add-user",
method = HttpMethods.POST,
entity = ByteString(write(createUser)))
.withHeaders(headers.`Content-Type`(ContentTypes.`application/json`))
Http().singleRequest(request)
Run Code Online (Sandbox Code Playgroud)
我收到此警告:
显式设置的HTTP标头'Content-Type:application / json'被忽略,
Content-Type不允许显式标头。HttpRequest.entity.contentType改为设置 。
服务器端的错误是:
415不支持的媒体类型
如何正确设置内容类型?
服务器代码:
object EchoService {
def route: Route = path("ws-echo") {
get {
handleWebSocketMessages(flow)
}
} ~ path("send-client") {
get {
sourceQueue.map(q => {
println(s"Offering message from server")
q.offer(BinaryMessage(ByteString("ta ta")))
} )
complete("Sent from server successfully")
}
}
val (source, sourceQueue) = {
val p = Promise[SourceQueue[Message]]
val s = Source.queue[Message](100, OverflowStrategy.backpressure).mapMaterializedValue(m => {
p.trySuccess(m)
m
})
(s, p.future)
}
val flow =
Flow.fromSinkAndSourceMat(Sink.ignore, source)(Keep.right)
}
Run Code Online (Sandbox Code Playgroud)
客户代码:
object Client extends App {
implicit val actorSystem = ActorSystem("akka-system")
implicit val flowMaterializer = …Run Code Online (Sandbox Code Playgroud) 我有一个请求,其响应取决于演员回复.我试图以这种方式测试它:
val myActor:TestProbe = TestProbe()
val route = new MyRoute() {
override def myServiceActor:ActorRef = {
myActor.ref
}
}.route
"return a query result for GET" in {
Get("/foo") ~> route ~> check {
myActor.expectMsg(ExecuteFoo())
myActor.reply(FOO)
responseEntity shouldEqual toJsonEntity(RequestResult(FOO))
}
}
Run Code Online (Sandbox Code Playgroud)
我得到了正确expectMsg的验证,但是reply对responseEntity检查是异步的.在这种情况下,测试失败.
有没有办法等待回复?
我对akka-http很陌生,在同一条路线上并行运行查询时遇到麻烦。
我有一条路线可能会非常快(如果已缓存)或没有(大量CPU多线程计算)返回结果。我想并行运行这些查询,以防一小段查询经过一长段计算之后又到达,我不希望第二个调用等待第一个调用完成。
但是,如果这些查询在同一路由上,则似乎不会并行运行(如果在不同的路由上,则并行运行)
我可以在一个基本项目中复制它:
并行调用服务器3次(使用http:// localhost:8080 / test上的3个Chrome选项卡),响应分别达到3.0秒,6.0秒和9.0秒。我想查询不会并行运行。
在具有jdk 8的Windows 10上的6核(带有HT)计算机上运行。
build.sbt
name := "akka-http-test"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-http-experimental" % "2.4.11"
Run Code Online (Sandbox Code Playgroud)
* AkkaHttpTest.scala **
import java.util.concurrent.Executors
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.concurrent.{ExecutionContext, Future}
object AkkaHttpTest extends App {
implicit val actorSystem = ActorSystem("system") // no application.conf here
implicit val executionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(6))
implicit val actorMaterializer = ActorMaterializer()
val route = path("test") {
onComplete(slowFunc()) { slowFuncResult =>
complete(slowFuncResult) …Run Code Online (Sandbox Code Playgroud) 我想利用一个简单的Flow从http服务中收集一些额外的数据,并用结果增强我的数据对象.以下说明了这个想法:
val httpClient = Http().superPool[User]()
val cityRequest = Flow[User].map { user=>
(HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), user) => user
case (Success(resp), user) => {
// << What to do here to get the value >> //
val responseData = processResponseSomehowToGetAValue?
val enhancedUser = new EnhancedUser(user.data, responseData)
enhancedUser
}
}
val processEnhancedUser = Flow[EnhancedUser].map {
// e.g.: Asynchronously save user to a database
}
val useEnhancementGraph = userSource
.via(getRequest)
.via(httpClient)
.via(getResponse)
.via(processEnhancedUser)
.to(Sink.foreach(println))
Run Code Online (Sandbox Code Playgroud)
我有一个问题需要理解Flow中流媒体性质和物化/期货之间的机制和差异.
以下想法没有向我解释: …
我是Akka的新手,我想了解reference.conf和application.conf文件之间的区别?
使用它们的正确方法是什么?我应该在每个文件中包含哪些变量?
akka-http ×10
akka ×8
scala ×8
akka-stream ×4
json ×1
sbt ×1
server ×1
spray ×1
unit-testing ×1
websocket ×1