使用光滑的3.0.0流式传输结果和Postgresql的正确方法是什么?

Tat*_*lay 12 postgresql scala slick akka-stream slick-3.0

我想弄清楚如何使用光滑的流媒体.我使用带有postgres驱动程序的光滑3.0.0

情况如下:服务器必须将客户端数据序列拆分为大小(以字节为单位)限制的块.所以,我写了以下光滑的查询:

val sequences = TableQuery[Sequences]
def find(userId: Long, timestamp: Long) = sequences.filter(s ? s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result
val seq = db.stream(find(0L, 0L))
Run Code Online (Sandbox Code Playgroud)

我将seq与akka-streams结合起来Source,编写自定义PushPullStage,限制数据大小(以字节为单位),并在达到大小限制时完成上游.它工作得很好.问题是 - 当我查看postgres日志时,我看到这样的查询 select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;

因此,乍一看似乎有很多(并且不必要的)数据库查询正在进行,只是在每个查询中使用几个字节.使用Slick进行流式传输的正确方法是什么,以便最大限度地减少数据库查询并充分利用每个查询中传输的数据?

Rik*_*ard 13

使用Slick和Postgres进行流媒体的"正确方法"包括三件事:

  1. 必须使用db.stream()

  2. 必须autoCommit在JDBC驱动程序中禁用.一种方法是通过后缀使查询在事务中运行.transactionally.

  3. 必须设置fetchSize为0以外的其他内容,否则postgres会将整个resultSet一次性推送到客户端.

例如:

DB.stream(
  find(0L, 0L)
    .transactionally
    .withStatementParameters(fetchSize = 1000)
).foreach(println)
Run Code Online (Sandbox Code Playgroud)

有用的链接:

https://github.com/slick/slick/issues/1038

https://github.com/slick/slick/issues/809

  • 为什么实际禁用自动提交? (3认同)