小编lai*_*ack的帖子

Akka Stream在Flow中使用HttpResponse

我想利用一个简单的Flow从http服务中收集一些额外的数据,并用结果增强我的数据对象.以下说明了这个想法:

val httpClient = Http().superPool[User]()

val cityRequest = Flow[User].map { user=>
  (HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}

val cityResponse = Flow[(Try[HttpResponse], User)].map {
  case (Failure(ex), user) => user
  case (Success(resp), user) => {
    // << What to do here to get the value >> //
    val responseData = processResponseSomehowToGetAValue?
    val enhancedUser = new EnhancedUser(user.data, responseData)
    enhancedUser
  }
}

val processEnhancedUser = Flow[EnhancedUser].map {
  // e.g.: Asynchronously save user to a database
}

val useEnhancementGraph = userSource
  .via(getRequest)
  .via(httpClient)
  .via(getResponse)
  .via(processEnhancedUser)
  .to(Sink.foreach(println))
Run Code Online (Sandbox Code Playgroud)

我有一个问题需要理解Flow中流媒体性质和物化/期货之间的机制和差异.

以下想法没有向我解释: …

scala akka akka-stream akka-http

4
推荐指数
1
解决办法
994
查看次数

标签 统计

akka ×1

akka-http ×1

akka-stream ×1

scala ×1