流畅的光滑/ scala

Saj*_*lva 7 streaming scala reactive-programming slick

我正在看scala/slick流,并试图了解它是如何工作的.这是我的测试代码

    val bigdata = TableQuery[BigData] 
    val x = db.stream(bigdata.result.transactionally.withStatementParameters(fetchSize = 100)).foreach {
      (tuple: (Int, UUID)) =>
        println(tuple._1 + " " + tuple._2)
        Thread.sleep(50)//emulating slow consumer.
    }

    Await.result(x, 100000 seconds)
Run Code Online (Sandbox Code Playgroud)

在代码运行时,我启用了postgresql查询日志,以了解最新情况.我看到每100个元素发生一次重新查询

2015-11-06 15:03:24 IST [24379-3] postgres @scala_test日志:执行从S_2/C_3获取:选择x2."id",x2."data"来自"bigdata"x2 2015-11-06 15:03:29 IST [24379-4] postgres @ scala_test日志:执行

fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:34 IST [24379-5] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:39 IST [24379-6] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:44 IST [24379-7] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:49 IST [24379-8] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
Run Code Online (Sandbox Code Playgroud)

然而,它看起来像是在获取整个数据集.我期待一个带偏移量的查询.

ie SELECT * FROM bigdata  LIMIT 100 OFFSET 500
Run Code Online (Sandbox Code Playgroud)

看起来一切都被查询,发送数据部分发送.

然后在上面的流媒体运行时,我将新的数据集插入到同一个表中.

在流媒体之前

SELECT count(*) FROM bigdata -> 500
Run Code Online (Sandbox Code Playgroud)

然后插入几行

SELECT count(*) FROM bigdata  -> 700
Run Code Online (Sandbox Code Playgroud)

但流式传输停止在500.这似乎表明新数据永远不会被提取和流回.任何关于流媒体如何工作的想法.