使用 Java Stream 使用数据库游标

Joe*_*tra 6 java ram java-stream database-cursor

我想使用 Java Stream 使用数据库游标。我希望 Java 流根据需要获取和处理行,并避免首先将所有 500 万行加载到内存中,然后再处理它们。

是否可以在不将整个表加载到 RAM 中的情况下使用它?

到目前为止我的代码如下所示:

Cursor<Product> products = DAO.selectCursor(...);

// 1. Initialize variables
long count = 0;
...
for (Iterator<Product> it = products.iterator(); it.hasNext();) {
  Product p = it.next();
  // 2. Processing each row
  ...
}
// 3. Concluding (processing totals, stats, etc.)
double avg = total / count;
...
Run Code Online (Sandbox Code Playgroud)

它确实工作得很好,但是有点麻烦,我想利用 Stream API。

Edw*_*rzo 4

首先,我们必须讨论如何从数据库中获取数据。如果您的目的是查看大量记录,并且不想将它们一次性加载到内存中,那么您有两种选择:

  1. 对结果进行分页。
  2. 让您的驱动程序对结果进行分页。

如果您已经有一个基于 的迭代器,可以根据Cursor需要检索分页数据,那么您可以使用SpliteratorsJDK StreamSupportAPI 中的实用程序类将其转换为Stream.

Stream<Product> products = StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(cursor.iterator(),
                        Spliterator.NONNULL |
                                Spliterator.ORDERED |
                                Spliterator.IMMUTABLE), false)
Run Code Online (Sandbox Code Playgroud)

否则你将不得不构建自己的东西。

驱动程序分页

如果您的 JDBC 驱动程序支持获取大小属性,您可以执行以下操作:

Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
Run Code Online (Sandbox Code Playgroud)

此时,rs包含第一次获取 1000 条记录,并且在您阅读上一页之前不会从数据库中检索更多记录。

所有这一切的棘手部分是,在读取完所有记录之前,您无法关闭任何资源(即连接、准备好的语句和结果集),并且由于我们要构建的流默认情况下是惰性的,这意味着我们有保持所有这些资源打开,直到我们完成流。

也许最简单的方法是围绕这个逻辑构建一个迭代器,当迭代器实际到达所有数据的末尾时,您可以关闭所有资源(即!rs.next()),或者另一种选择是在流关闭时完成所有工作( Stream.onClose())。

一旦我们有了迭代器,就可以非常简单地使用JDK API 中的实用程序类Spliterators来构建流。StreamSupport

我的基本实现看起来有点像这样。这仅用于说明目的。您可能想对您的特定案例给予更多的关爱。

Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
Run Code Online (Sandbox Code Playgroud)

这里的关键点是要注意我们返回的流应该运行一些onClose逻辑,因此当我们使用流时必须确保我们stream.close()在完成它时执行 a 以确保我们关闭到目前为止保持活动状态的所有资源(即connstmrs)。

最好的方法也许是使用 try-with-resources,这样 try 将负责关闭流。

public Stream<String> getUsers() {
    DataSource ds = jdbcTemplate.getDataSource();
    try {
        Connection conn = ds.getConnection();
        conn.setAutoCommit(false);
        PreparedStatement stm = conn.prepareStatement("SELECT id FROM users", ResultSet.TYPE_FORWARD_ONLY);
        //fetch size is what guarantees only 1000 records at the time
        stm.setFetchSize(1000);
        ResultSet rs = stm.executeQuery();

        Iterator<String> sqlIter = new Iterator<>() {
            @Override
            public boolean hasNext() {
                try {
                    return rs.next();
                } catch (SQLException e) {
                    closeResources(conn, stm, rs);
                    throw new RuntimeException("Failed to read record from ResultSet", e);
                }
            }

            @Override
            public String next() {
                try {
                    return rs.getString("id");
                } catch (SQLException e) {
                    closeResources(conn, stm, rs);
                    throw new RuntimeException("Failed to read record from ResultSet", e);
                }
            }
        };

        //turn iterator into a stream
        return StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(sqlIter,
                        Spliterator.NONNULL |
                                Spliterator.ORDERED |
                                Spliterator.IMMUTABLE), false
        ).onClose(() -> {
            //make sure to close resources when done with the stream
            closeResources(conn, stm, rs);
        });


    } catch (SQLException e) {
        logger.error("Failed to process data", e);
        throw new RuntimeException(e);
    }
}

private void closeResources(Connection conn, PreparedStatement ps, ResultSet rs) {
    try (conn; ps; rs) {
        logger.info("Resources successfully closed");
    } catch (SQLException e) {
        logger.warn("Failed to properly close database sources", e);
    }
}
Run Code Online (Sandbox Code Playgroud)

手动分页

另一种方法是您自己对结果进行分页,这取决于数据库,但使用限制和偏移量等选择子句,您可以请求特定的记录页面,处理它们,然后检索更多记录。

try(Stream<String> users = userRepo.getUsers()){
    //print users to the main output retrieving 1K at the time
    users.forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,您的迭代器将消耗所有页面,完成后请求下一页,直到在最终页面中找不到更多记录。

另一种方法的优点是可以在迭代器本身中立即控制资源。

我不会开发一个这样的例子,留给你尝试。