使用Akka Http转换Slick Streaming数据并发送Chunked Response

use*_*786 15 scala akka slick akka-stream akka-http

目的是从数据库流式传输数据,对此数据块执行一些计算(此计算返回某些案例类的Future),并将此数据作为分块响应发送给用户.目前,我能够流式传输数据并发送响应,而无需执行任何计算.但是,我无法执行此计算,然后流式传输结果.

这是我实施的路线.

def streamingDB1 =
path("streaming-db1") {
  get {
    val src = Source.fromPublisher(db.stream(getRds))
    complete(src)
  }
}
Run Code Online (Sandbox Code Playgroud)

函数getRds返回映射到case类的表的行(使用slick).现在考虑函数compute,它将每行作为输入并返回另一个案例类的Future.就像是

def compute(x: Tweet) : Future[TweetNew] = ?
Run Code Online (Sandbox Code Playgroud)

如何在变量src上实现此函数,并将此计算的分块响应(作为流)发送给用户.

Jef*_*ung 7

您可以使用mapAsync以下方法转换源:

val src =
  Source.fromPublisher(db.stream(getRds))
        .mapAsync(parallelism = 3)(compute)

complete(src)
Run Code Online (Sandbox Code Playgroud)

根据需要调整并行度.


请注意,您可能需要配置Slick文档中提到的一些设置:

注意:某些数据库系统可能需要以某种方式设置会话参数以支持流式传输,而不会在客户端的内存中同时缓存所有数据.例如,PostgreSQL需要.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)(具有所需的页面大小n)和.transactionally正确的流媒体.

因此,如果您正在使用PostgreSQL,那么您Source可能会看起来像下面这样:

val src =
  Source.fromPublisher(
    db.stream(
      getRds.withStatementParameters(
        rsType = ResultSetType.ForwardOnly,
        rsConcurrency = ResultSetConcurrency.ReadOnly,
        fetchSize = 10
      ).transactionally
    )
  ).mapAsync(parallelism = 3)(compute)
Run Code Online (Sandbox Code Playgroud)