使用 Slick (JDBC) Connector for Alpakka 时使用 Paging SQL 语句是否有意义

cok*_*mpf 1 scala slick akka-stream alpakka

我目前想知道 Alpakka 的 Slick (JDBC) 连接器是如何在幕后工作的 - 我真的无法使用文档找到答案。

考虑一个用例,我想处理从数据库中选择的大量记录。我可以简单地SELECT * FROM [TABLE]在单个流中使用 a 吗,或者为每个页面(一个接一个)启动多个流是否有意义,例如SELECT * FROM [TABLE] LIMIT 0,1000.

我希望/认为 Slick Connector Alpakka 的反应式方式只在流需要它们时才从数据库中获取记录,以便我可以使用SELECT * FROM [TABLE]...

谁能给我一些见解或一些好的文档来阅读?

Jef*_*ung 7

考虑 AlpakkaSlick.source方法的源代码:

/**
 * Scala API: creates a Source[T, NotUsed] that performs the
 *            specified query against the (implicitly) specified
 *            Slick database and streams the results.
 *            This works for both "typed" Slick queries
 *            and "plain SQL" queries.
 *
 * @param streamingQuery The Slick query to execute, which can
 *                       be either a "typed" query or a "plain SQL"
 *                       query produced by one of the Slick "sql..."
 *                       String interpolators
 * @param session The database session to use.
 */
def source[T](
    streamingQuery: StreamingDBIO[Seq[T], T]
)(implicit session: SlickSession): Source[T, NotUsed] =
  Source.fromPublisher(session.db.stream(streamingQuery))
Run Code Online (Sandbox Code Playgroud)

session.db.stream(streamingQuery))上面的结果是 a DatabasePublisher,这是一个Publisher传递给 Akka Stream 的Reactive Streams Source.fromPublisher。不要担心尝试为数据子集创建多个流;您可以安全地使用检索表中所有行的查询,并将结果Source作为单个流处理。

需要注意的一件事是,您可能需要配置一些设置,而不是在 Alpakka 文档中提到的,而是在Slick 文档中提到的:

注意:某些数据库系统可能需要以某种方式设置会话参数以支持流式传输,而无需在客户端的内存中一次缓存所有数据。例如,PostgreSQL 需要.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)(具有所需的页面大小n)和.transactionally适当的流媒体。

因此,例如,如果您使用的是 PostgreSQL,那么您的代码Source可能如下所示:

val source =
  Slick.source(
    TableQuery[Items]
      .result
      .withStatementParameters(
        rsType = ResultSetType.ForwardOnly,
        rsConcurrency = ResultSetConcurrency.ReadOnly,
        fetchSize = 10
      )
      .transactionally)
Run Code Online (Sandbox Code Playgroud)

TableQuery[Items].result返回表中与 关联的所有行Items

尽管有文档,我已经成功地将 SlickDatabasePublisher与 Akka Streams 结合使用,从 PostgreSQL 的表中检索和更新数百万行,而无需设置withStatementParameterstransactionally. 在没有这些设置的情况下尝试:

val source = Slick.source(TableQuery[Items].result)
Run Code Online (Sandbox Code Playgroud)