Tim*_*Tim 11 scala akka-stream akka-http
TLDR:当我有一个传出的http请求作为流的一部分时,是否更好地为每个请求实现一个流(即使用短期流)或跨请求使用单个流实现?
详细信息:我有一个典型的服务,它接受HTTP请求,将其分散到多个第三方下游服务(不受我控制),并在发送回来之前聚合结果.我正在使用akka-http进行客户端实现并为服务器进行喷涂(遗留,随着时间推移将转移到akka-http).示意图:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
这可以通过为每个请求实现流或实现(部分)流一次并在请求之间共享来实现.
每个请求的实现产生实现开销1,并且不清楚如何利用连接池.这里描述了这个问题(许多实现可能会耗尽池).我可以像这里一样在长时间运行的http流中包装一个池并包装在mapAsync"上游"中,但错误处理策略对我来说并不清楚.当单个请求失败并且流终止时,它是否会取消池?此外,似乎我需要协调请求和响应,因为它们不会按顺序返回.
// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
Run Code Online (Sandbox Code Playgroud)
跨请求共享流有一个类似的错误处理问题 - 似乎有一些失败模式可以使所有请求在飞行中关闭该流.代码类似于主机级API,但是队列面向整个流.
在这种情况下哪种方式更好?
我确实试图实现这两种解决方案,但是在每个实施阶段都有很多设计选择,因此即使在"正确"的路径上也很容易搞砸.
1虽然我认为它可以忽略不计,但它与akka-http服务器的运行方式相同.
一般来说,最好使用单个连接Flow并通过该单个流分派所有请求。主要原因是新的物化实际上可能会导致Connection每次形成新的物化(取决于您的连接池设置)。
你是对的,这会导致一些并发症:
排序:通过提供随机值UUID作为传递给连接流的元组中的第二个值,您将消除将请求与响应关联起来的能力。元组中的额外T值可以用作“相关 ID”,以了解HttpResponse您从 Flow 中获得的值。在您的特定示例中,您可以使用Int您Range创建的首字母:
val responseSource : Source[(Try[HttpResponse], Int), _] =
Source
.fromIterator( () => Iterator range (0,5) )
.map(i => HttpRequest(...) -> i)
.via(connectionFlow)
Run Code Online (Sandbox Code Playgroud)
现在,每个响应都带有原始 Int 值,您可以使用它来处理响应。
错误处理:您所说的“单个请求失败并且流被终止”是不正确的。单个请求失败并不一定会导致流失败。相反,您只需(Failure(exception), Int)从连接流中获取一个值。您现在知道哪个 Int 导致了失败,并且您从流程中获得了异常。
| 归档时间: |
|
| 查看次数: |
445 次 |
| 最近记录: |