我需要使用Akka的HTTP客户端(v2.0.2)来使用REST服务.逻辑方法是通过主机连接池执行此操作,因为我们期望大量的同时连接.在Flow此消耗(HttpRequest, T)和返回(Try[HttpResponse, T).该文件表明,一些任意类型T是需要管理的秩序回应潜在出请求,但没有指出什么主叫方应该使用返回的做T.
我的第一次尝试是使用Intas的下面的函数T.从许多地方调用它以确保连接使用单个池.
val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))
def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
val unique = Random.nextInt
Source.single(req ? unique).via(pool).runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), `unique`) ? Future.successful(r)
case (Failure(f), `unique`) ? Future.failed(f)
case (_, i) ? Future.failed(new Exception("Return does not match the request"))
}
}
Run Code Online (Sandbox Code Playgroud)
问题是客户应该如何使用它T?有更清洁更有效的解决方案吗?最后,我的偏执是否可能无法实现偏执?
cmb*_*ter 24
我最初对此感到有些困惑,直到我几次阅读文档.如果要在池中使用单个请求,无论共享同一个池有多少个不同的位置T,您提供的(Int在您的情况下)并不重要.因此,如果您一直在使用Source.single,那么1如果您真的想要,那么该密钥总是如此.
但它确实发挥作用的方法是,如果一段代码将使用池并一次向池中提交多个请求,并希望得到所有这些请求的响应.原因是响应按照从被调用的服务接收的顺序返回,而不是它们被提供给池的顺序.每个请求可能需要不同的时间,因此它们Sink按照从池中收回的顺序向下游流动.
假设我们有一个服务,接受GET请求的形式为url:
/product/123
Run Code Online (Sandbox Code Playgroud)
其中123部分是,你要查找的产品的ID.如果我想1-10一次查找所有产品,并且每个产品都有单独的请求,那么这就是标识符变得重要的地方,这样我就可以将每个标识符HttpResponse与产品ID 相关联.此方案的简化代码示例如下:
val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] =
Source(requests).
via(pool).
runFold(Map.empty[Int,HttpResponse]){
case (m, (util.Success(resp), id)) =>
m ++ Map(id -> resp)
case (m, (util.Failure(ex), i)) =>
//Log a failure here probably
m
}
Run Code Online (Sandbox Code Playgroud)
当我得到我的回复时fold,我也方便地拥有每个与之关联的id,因此我可以将它们添加到我Map的id中.如果没有这个功能,我可能不得不做一些事情,比如解析正文(如果它是json)来尝试找出哪个响应是哪个并且这不是理想的,并且这不包括失败的情况.在这个解决方案中,我知道哪些请求失败了,因为我仍然得到了标识符.
我希望能为你澄清一些事情.
使用基于HTTP的资源时,Akka HTTP连接池是强大的盟友.如果您要一次执行单个请求,那么解决方案是:
def exec(req: HttpRequest): Future[HttpResponse] = {
Source.single(req ? 1)
.via(pool)
.runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), _) ? Future.successful(r)
case (Failure(f), _) ? Future.failed(f)
}
}
Run Code Online (Sandbox Code Playgroud)
因为您正在执行single请求,所以无需消除响应的歧义.然而,Akka流是聪明的.您可以同时向池提交多个请求.在这种情况下,我们传入一个Iterable[HttpRequest].返回的Iterable[HttpResponse]内容使用SortedMap与原始请求相同的顺序重新排序.你可以做一个request zip response排队的事情:
def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ? m + (idx ? Future.successful(r))
case (m, (Failure(e), idx)) ? m + (idx ? Future.failed(e))
}.map(r ? r.values)
}
Run Code Online (Sandbox Code Playgroud)
如果您需要以自己的方式解开东西,那么可迭代期货的期货很棒.通过扁平化事物可以获得更简单的响应.
def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ? m + (idx ? Future.successful(r))
case (m, (Failure(e), idx)) ? m + (idx ? Future.failed(e))
}.flatMap(r ? Future.sequence(r.values))
}
Run Code Online (Sandbox Code Playgroud)
我已经使用所有导入和包装器制作了这个要点,以使客户端使用HTTP服务.
特别感谢@cmbaxter他的简洁例子.