如何异步流式传输/分块数据库结果?

nic*_*bot 3 scala playframework playframework-2.2

我想使用Enumerator(类似于ScalaStream示例)将数据库结果块化为响应为TSV.我可能有数千行,我不想分页结果,也不想将整个行累积ResultSet到一个单独的行中String.如果需要,块可以不在行分隔符处中断.换句话说,唯一的标准是对结果进行分块,而不是遵循TSV分隔符.

我想在我的行动中做这样的事情:

Ok.stream(new Iterator[ResultSet] {
  val conn = DB.getConnection()
  val stmt = conn.createStatement
  val rs = stmt.executeQuery("""select * from atable""")
  def hasNext = !rs.isLast() && !rs.isClosed()
  def next() = {
    if (!rs.next()) {
      conn.close()
      null
    } else rs
  }
}.toStream)
Run Code Online (Sandbox Code Playgroud)

不幸的是,Ok.stream期望java.io.InputStream并且将结果包装起来InputStream似乎过度了.Ok.stream也接受了,Enumerator但我不确定如何在这种情况下创建一个.

Jam*_*per 10

首先,你不能"异步",因为JDBC API是同步的 - 当你调用时rs.next(),它将同步阻塞.但是没关系,只需要确保你的线程池被调整为允许阻塞操作(即,使它们变大).Play有一个异步流API(与您最常用的同步InputStream/OutputStream完全不同),它使用称为iteratees/enumerators的东西.您想要创建枚举ResultSet的枚举器.基本上,你想做一些看起来像这样的事情:

import play.api.libs.iteratee._

val conn = DB.getConnection()
val stmt = conn.createStatement
val resultSet = stmt.executeQuery("""select * from atable""")

Ok.stream(Enumerator.unfold(resultSet) { (rs: ResultSet) =>
  if (rs.next()) {
    val chunk = // Read the result from the ResultSet and format it in the way you want it formatted
    Some((rs, chunk))
  } else None
}.onDoneEnumerating {
  resultSet.close()
  stmt.close()
  conn.close()
})
Run Code Online (Sandbox Code Playgroud)