Joe*_*tra 6 java ram java-stream database-cursor
我想使用 Java Stream 使用数据库游标。我希望 Java 流根据需要获取和处理行,并避免首先将所有 500 万行加载到内存中,然后再处理它们。
是否可以在不将整个表加载到 RAM 中的情况下使用它?
到目前为止我的代码如下所示:
Cursor<Product> products = DAO.selectCursor(...);
// 1. Initialize variables
long count = 0;
...
for (Iterator<Product> it = products.iterator(); it.hasNext();) {
Product p = it.next();
// 2. Processing each row
...
}
// 3. Concluding (processing totals, stats, etc.)
double avg = total / count;
...
Run Code Online (Sandbox Code Playgroud)
它确实工作得很好,但是有点麻烦,我想利用 Stream API。
首先,我们必须讨论如何从数据库中获取数据。如果您的目的是查看大量记录,并且不想将它们一次性加载到内存中,那么您有两种选择:
如果您已经有一个基于 的迭代器,可以根据Cursor需要检索分页数据,那么您可以使用SpliteratorsJDK StreamSupportAPI 中的实用程序类将其转换为Stream.
Stream<Product> products = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(cursor.iterator(),
Spliterator.NONNULL |
Spliterator.ORDERED |
Spliterator.IMMUTABLE), false)
Run Code Online (Sandbox Code Playgroud)
否则你将不得不构建自己的东西。
如果您的 JDBC 驱动程序支持获取大小属性,您可以执行以下操作:
Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
Run Code Online (Sandbox Code Playgroud)
此时,rs包含第一次获取 1000 条记录,并且在您阅读上一页之前不会从数据库中检索更多记录。
所有这一切的棘手部分是,在读取完所有记录之前,您无法关闭任何资源(即连接、准备好的语句和结果集),并且由于我们要构建的流默认情况下是惰性的,这意味着我们有保持所有这些资源打开,直到我们完成流。
也许最简单的方法是围绕这个逻辑构建一个迭代器,当迭代器实际到达所有数据的末尾时,您可以关闭所有资源(即!rs.next()),或者另一种选择是在流关闭时完成所有工作( Stream.onClose())。
一旦我们有了迭代器,就可以非常简单地使用JDK API 中的实用程序类Spliterators来构建流。StreamSupport
我的基本实现看起来有点像这样。这仅用于说明目的。您可能想对您的特定案例给予更多的关爱。
Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
Run Code Online (Sandbox Code Playgroud)
这里的关键点是要注意我们返回的流应该运行一些onClose逻辑,因此当我们使用流时必须确保我们stream.close()在完成它时执行 a 以确保我们关闭到目前为止保持活动状态的所有资源(即conn,stm和rs)。
最好的方法也许是使用 try-with-resources,这样 try 将负责关闭流。
public Stream<String> getUsers() {
DataSource ds = jdbcTemplate.getDataSource();
try {
Connection conn = ds.getConnection();
conn.setAutoCommit(false);
PreparedStatement stm = conn.prepareStatement("SELECT id FROM users", ResultSet.TYPE_FORWARD_ONLY);
//fetch size is what guarantees only 1000 records at the time
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
Iterator<String> sqlIter = new Iterator<>() {
@Override
public boolean hasNext() {
try {
return rs.next();
} catch (SQLException e) {
closeResources(conn, stm, rs);
throw new RuntimeException("Failed to read record from ResultSet", e);
}
}
@Override
public String next() {
try {
return rs.getString("id");
} catch (SQLException e) {
closeResources(conn, stm, rs);
throw new RuntimeException("Failed to read record from ResultSet", e);
}
}
};
//turn iterator into a stream
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(sqlIter,
Spliterator.NONNULL |
Spliterator.ORDERED |
Spliterator.IMMUTABLE), false
).onClose(() -> {
//make sure to close resources when done with the stream
closeResources(conn, stm, rs);
});
} catch (SQLException e) {
logger.error("Failed to process data", e);
throw new RuntimeException(e);
}
}
private void closeResources(Connection conn, PreparedStatement ps, ResultSet rs) {
try (conn; ps; rs) {
logger.info("Resources successfully closed");
} catch (SQLException e) {
logger.warn("Failed to properly close database sources", e);
}
}
Run Code Online (Sandbox Code Playgroud)
另一种方法是您自己对结果进行分页,这取决于数据库,但使用限制和偏移量等选择子句,您可以请求特定的记录页面,处理它们,然后检索更多记录。
try(Stream<String> users = userRepo.getUsers()){
//print users to the main output retrieving 1K at the time
users.forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,您的迭代器将消耗所有页面,完成后请求下一页,直到在最终页面中找不到更多记录。
另一种方法的优点是可以在迭代器本身中立即控制资源。
我不会开发一个这样的例子,留给你尝试。
| 归档时间: |
|
| 查看次数: |
2894 次 |
| 最近记录: |