链中的Akka-http-client链请求

Rab*_*bzu 8 scala akka-stream akka-http

我想使用akka-http-client链接http请求作为Stream.链中的每个http请求都取决于先前请求的成功/响应,并使用它来构造新请求.如果请求不成功,则Stream应返回不成功请求的响应.

如何在akka-http中构建这样的流?我应该使用哪个akka-http客户端级API?

Mik*_*ame 11

如果您正在制作网络抓取工具,请查看此帖子.此答案解决了一个更简单的情况,例如下载分页资源,其中指向下一页的链接位于当前页面响应的标题中.

您可以使用该Source.unfoldAsync方法创建链式源 - 其中一个项目指向下一个项目.这需要一个函数,它接受一个元素S并返回Future[Option[(S, E)]]以确定流是否应该继续发出类型的元素E,将状态传递给下一个调用.

在你的情况下,这有点像:

  1. 初步的 HttpRequest
  2. 生产一个 Future[HttpResponse]
  3. 如果响应指向另一个URL,则返回Some(request -> response),否则返回None

然而,有一个皱纹,即如果它不包含指向下一个请求的指针,它将不会从流中发出响应.

为了解决这个问题,您可以将函数传递给unfoldAsyncreturn Future[Option[(Option[HttpRequest], HttpResponse)]].这允许您处理以下情况:

  • 当前的响应是一个错误
  • 当前响应指向另一个请求
  • 当前响应不指向另一个请求

接下来是一些带注释的代码,它概述了这种方法,但首先是初步的:

当将HTTP请求流式传输到Akka流中的响应时,您需要确保消耗响应主体,否则会发生坏事(死锁等).如果您不需要主体,则可以忽略它,但在这里我们使用用于将HttpEntity(潜在)流转换为严格实体的函数:

import scala.concurrent.duration._

def convertToStrict(r: HttpResponse): Future[HttpResponse] =
  r.entity.toStrict(10.minutes).map(e => r.withEntity(e))
Run Code Online (Sandbox Code Playgroud)

接下来,Option[HttpRequest]从一个创建一个函数HttpResponse.此示例使用类似Github的分页链接的方案,其中Links标头包含,例如<https://api.github.com/...> rel="next"::

def nextUri(r: HttpResponse): Seq[Uri] = for {
  linkHeader <- r.header[Link].toSeq
  value <- linkHeader.values
  params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri

def getNextRequest(r: HttpResponse): Option[HttpRequest] =
  nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))
Run Code Online (Sandbox Code Playgroud)

接下来,我们将传递给真正的函数unfoldAsync.它使用Akka HTTP Http().singleRequest()API来获取HttpRequest并生成Future[HttpResponse]:

def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
  reqOption match {
    case Some(req) => Http().singleRequest(req).flatMap { response =>
      // handle the error case. Here we just return the errored response
      // with no next item.
      if (response.status.isFailure()) Future.successful(Some(None -> response))

      // Otherwise, convert the response to a strict response by
      // taking up the body and looking for a next request.
      else convertToStrict(response).map { strictResponse =>
        getNextRequest(strictResponse) match {
          // If we have no next request, return Some containing an
          // empty state, but the current value
          case None => Some(None -> strictResponse)

          // Otherwise, pass on the request...
          case next => Some(next -> strictResponse)
        }
      }
    }
    // Finally, there's no next request, end the stream by
    // returning none as the state.
    case None => Future.successful(None)
  }
Run Code Online (Sandbox Code Playgroud)

请注意,如果我们收到错误响应,则流将不会继续,因为我们将返回None下一个状态.

您可以调用它来获取HttpResponse对象流,如下所示:

val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests)
Run Code Online (Sandbox Code Playgroud)

至于返回最后一个(或错误的)响应的值,您只需要使用Sink.last,因为流将在成功完成时或在第一个错误响应时结束.例如:

def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
      Some(initialRequest))(chainRequests)
    .map(_.status)
    .runWith(Sink.last)
Run Code Online (Sandbox Code Playgroud)