Akka HTTP Streaming JSON反序列化

Mar*_*ijn 8 json akka akka-stream akka-http

是否有可能从Akka HTTP 动态反序列化未知长度的外部ByteString流到域对象?


上下文

我称一个无限长的HTTP端点输出一个JSON Array不断增长的端点:

[
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    ...
] <- Never sees the daylight
Run Code Online (Sandbox Code Playgroud)

Dmi*_*ach 5

我想JsonFraming.objectScanner(Int.MaxValue)在这种情况下应该使用它。正如文档所述:

返回一个 Flow,该 Flow 实现基于“括号计数”的成帧运算符,以发出有效的 JSON 块。它扫描传入数据流中的有效 JSON 对象,并返回仅包含这些有效块的 ByteString 块。人们可能想要使用此运算符构建数据的典型示例包括:非常大的数组

所以你最终可以得到这样的结果:

val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl))

response.onComplete {
  case Success(value) =>
    value.entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(_.utf8String)         // In case you have ByteString
      .map(decode[MyEntity](_))  // Use any Unmarshaller here
      .grouped(20)
      .runWith(Sink.ignore)      // Do whatever you need here 
  case Failure(exception) => log.error(exception, "Api call failed")
}
Run Code Online (Sandbox Code Playgroud)