使用 AKKA-HTTP 流式传输 CSV 源

Has*_*ara 1 scala mongodb akka-stream akka-http

我正在尝试使用reactivemongo-akkastream 0.12.1从Mongodb流式传输数据,并将结果返回到其中一个路由中的CSV流中(使用Akka-http)。我确实按照此处的示例实现了这一点:

http://doc.akka.io/docs/akka-http/10.0.0/scala/http/routing-dsl/source-streaming-support.html#simple-csv-streaming-example

它看起来工作正常。

我现在面临的唯一问题是如何将标题添加到输出 CSV 文件中。有任何想法吗?

谢谢

Mik*_*ame 5

除了该示例并不是真正强大的生成 CSV 的方法(不提供适当的转义)之外,您还需要对其进行一些修改以添加标题。这是我会做的:

  1. make aFlow将 a 转换Source[Tweet]为 CSV 行的源,例如 aSource[List[String]]
  2. 将它连接到一个包含你的标题的源作为一个 List[String]
  3. 调整编组器以呈现行源而不是推文

下面是一些示例代码:

case class Tweet(uid: String, txt: String)

def getTweets: Source[Tweet, NotUsed] = ???

val tweetToRow: Flow[Tweet, List[String], NotUsed] =
  Flow[Tweet].map { t =>
    List(
      t.uid,
      t.txt.replaceAll(",", "."))
  }

// provide a marshaller from a row (List[String]) to a ByteString
implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row =>
  Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () =>
    ByteString(row.mkString(","))
  )
}

// enable csv streaming
implicit val csvStreaming = EntityStreamingSupport.csv()

val route = path("tweets") {
  val headers = Source.single(List("uid", "text"))
  val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow)
  complete(headers.concat(tweets))
}
Run Code Online (Sandbox Code Playgroud)

更新:如果您的getTweets方法返回 aFuture您可以只映射其源值并以这种方式添加标题,例如:

val route = path("tweets") {
  val headers = Source.single(List("uid", "text"))
  val rows: Future[Source[List[String], NotUsed]] = getTweets
      .map(tweets => headers.concat(tweets.via(tweetToRow)))
  complete(rows)
}
Run Code Online (Sandbox Code Playgroud)