我正在尝试了解如何使用新akka.http库.我想向服务器发送一个http请求,并将整个响应主体作为单个String读取,以便生成一个Source[String,?].
这是迄今为止我能够制作的最佳解决方案:
def get(
modelID: String,
pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]
): Source[String,Unit] = {
val uri = reactionsURL(modelID)
val req = HttpRequest(uri = uri)
Source.single( (req,0) )
.via( pool )
.map {
case (Success(resp),_) =>
resp.entity.dataBytes.map( _.decodeString("utf-8") )
}.flatten(FlattenStrategy.concat)
.grouped( 1024 )
.map( _.mkString )
Run Code Online (Sandbox Code Playgroud)
它似乎工作得很好(除了缺少的错误路径),但对于这样简单的任务来说它有点笨拙.有更聪明的解决方案吗?我可以避免grouped/ mkString?
我想利用一个简单的Flow从http服务中收集一些额外的数据,并用结果增强我的数据对象.以下说明了这个想法:
val httpClient = Http().superPool[User]()
val cityRequest = Flow[User].map { user=>
(HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), user) => user
case (Success(resp), user) => {
// << What to do here to get the value >> //
val responseData = processResponseSomehowToGetAValue?
val enhancedUser = new EnhancedUser(user.data, responseData)
enhancedUser
}
}
val processEnhancedUser = Flow[EnhancedUser].map {
// e.g.: Asynchronously save user to a database
}
val useEnhancementGraph = userSource
.via(getRequest)
.via(httpClient)
.via(getResponse)
.via(processEnhancedUser)
.to(Sink.foreach(println))
Run Code Online (Sandbox Code Playgroud)
我有一个问题需要理解Flow中流媒体性质和物化/期货之间的机制和差异.
以下想法没有向我解释: …
我正在使用 akka.http.scaladsl.model.HttpResponse、HttpEntity。
得到 response 后,它是格式的 responseEntity 类型 (Content-type: 'application/json', {MyJSONHERE})。有没有办法从实体中提取我的json。
我尝试了 entity.getDataBytes ,它以 ByteString 格式提供实体的内容。我想正确读取 JSON 并解析它。有人可以指导我吗?