lai*_*ack 4 scala akka akka-stream akka-http
我想利用一个简单的Flow从http服务中收集一些额外的数据,并用结果增强我的数据对象.以下说明了这个想法:
val httpClient = Http().superPool[User]()
val cityRequest = Flow[User].map { user=>
(HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), user) => user
case (Success(resp), user) => {
// << What to do here to get the value >> //
val responseData = processResponseSomehowToGetAValue?
val enhancedUser = new EnhancedUser(user.data, responseData)
enhancedUser
}
}
val processEnhancedUser = Flow[EnhancedUser].map {
// e.g.: Asynchronously save user to a database
}
val useEnhancementGraph = userSource
.via(getRequest)
.via(httpClient)
.via(getResponse)
.via(processEnhancedUser)
.to(Sink.foreach(println))
Run Code Online (Sandbox Code Playgroud)
我有一个问题需要理解Flow中流媒体性质和物化/期货之间的机制和差异.
以下想法没有向我解释:
如何从响应中获取值到新用户对象中,因此我可以在以下步骤中处理该对象.
感谢帮助.
更新:
我正在使用远程akka http服务器评估代码,使用下面的代码在解析之前立即和10秒之间回答请求.这导致一些"EnhancedUser"实例最终出现,但那些花了太长时间才回答的人错过了他们的价值观.
我在某个时间将.async添加到cityResponse解析器的末尾,结果输出花费的时间更长,但是正确.
这种行为的原因是什么?它与接受的答案如何契合?
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), member) => member
case (Success(response), member) => {
Unmarshal(response.entity).to[String] onComplete {
case Success(s) => member.city = Some(s)
case Failure(ex) => member.city = None
}
}
member
}.async // <<-- This changed the behavior to be correct, why?
Run Code Online (Sandbox Code Playgroud)
您可以使用两种不同的策略,具体取决于您从"cityRequestEndpoint"获得的实体的性质:
基于流
处理这种情况的典型方法是始终假设来自源端点的实体可以包含N个数据,其中N是事先未知的.这通常是要遵循的模式,因为它是现实世界中最通用的,因此是"最安全的".
第一步是将HttpResponse来自端点的转换为数据源:
val convertResponseToByteStrSource : (Try[HttpResponse], User) => Source[(Option[ByteString], User), _] =
(response, user) => response match {
case Failure(_) => Source single (None -> user)
case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user)
}
Run Code Online (Sandbox Code Playgroud)
上面的代码是我们不假设N的大小,r.entity.dataBytes可能是0 ByteString值的源,或可能是无限数值.但我们的逻辑并不关心!
现在我们需要组合来自Source的数据.这是Flow.flatMapConcat的一个很好的用例,它接受一个源流并将其转换为值的流(类似于Iterables的flatMap):
val cityByteStrFlow : Flow[(Try[HttpResponse], User), (Option[ByteString], User), _] =
Flow[(Try[HttpResponse], User)] flatMapConcat convertResponseToByteStrSource
Run Code Online (Sandbox Code Playgroud)
剩下要做的就是将元组转换(ByteString, User)成EnhancedUser.注意:我假设下面是从问题逻辑中推断出User的子类EnhancedUser:
val convertByteStringToUser : (Option[ByteString], User) => EnhancedUser =
(byteStr, user) =>
byteStr
.map(s => EnhancedUser(user.data, s))
.getOrElse(user)
val cityUserFlow : Flow[(Option[ByteString], User), EnhancedUser, _] =
Flow[(ByteString, User)] map convertByteStringToUser
Run Code Online (Sandbox Code Playgroud)
现在可以组合这些组件:
val useEnhancementGraph =
userSource
.via(cityRequest)
.via(httpClient)
.via(cityByteStrFlow)
.via(cityUserFlow)
.via(processEnhancedUser)
.to(Sink foreach println)
Run Code Online (Sandbox Code Playgroud)
未来基础
我们可以使用Futures来解决问题,类似于您在原始问题中引用的堆栈问题.我不推荐这种方法有两个原因:
EnhancedUser.Async.await(几乎总是应该避免).要使用基于Future的方法,对原始代码的唯一重大改变是使用Flow.mapAsync而不是Flow.map处理Future在函数中创建a的事实:
val parallelism = 10
val timeout : FiniteDuration = ??? //you need to specify the timeout limit
val convertResponseToFutureByteStr : (Try[HttpResponse], User) => Future[EnhancedUser] =
_ match {
case (Failure(ex), user) =>
Future successful user
case (Success(resp), user) =>
resp
.entity
.toStrict(timeout)
.map(byteStr => new EnhancedUser(user.data, byteStr))
}
val cityResponse : Flow[(Try[HttpResponse], User), EnhancedUser, _] =
Flow[(Try[HttpResponse], User)].mapAsync(parallelism)(convertResponseToFutureByteStr)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
994 次 |
| 最近记录: |