Rit*_*ita 2 java-8 java-stream
我正从DB中检索大块数据并使用此数据将其写入其他位置.为了避免漫长的处理时间,我正在尝试使用并行流来编写它.
当我将其作为顺序流运行时,它可以完美地运行.但是,如果我将其更改为并行,则行为很奇怪:它会多次打印同一个对象(超过10个).
@PostConstruct
public void retrieveAllTypeRecords() throws SQLException {
logger.info("Retrieve batch of Type records.");
try {
Stream<TypeRecord> typeQueryAsStream = jdbcStream.getTypeQueryAsStream();
typeQueryAsStream.forEach((type) -> {
logger.info("Printing Type with field1: {} and field2: {}.", type.getField1(), type.getField2()); //the same object gets printed here multiple times
//write this object somewhere else
});
logger.info("Completed full retrieval of Type data.");
} catch (Exception e) {
logger.error("error: " + e);
}
}
public Stream<TypeRecord> getTypeQueryAsStream() throws SQLException {
String sql = typeRepository.getQueryAllTypesRecords(); //retrieves SQL query in String format
TypeMapper typeMapper = new TypeMapper();
JdbcStream.StreamableQuery query = jdbcStream.streamableQuery(sql);
Stream<TypeRecord> stream = query.stream()
.map(row -> {
return typeMapper.mapRow(row); //maps columns values to object values
});
return stream;
}
public class StreamableQuery implements Closeable {
(...)
public Stream<SqlRow> stream() throws SQLException {
final SqlRowSet rowSet = new ResultSetWrappingSqlRowSet(preparedStatement.executeQuery());
final SqlRow sqlRow = new SqlRowAdapter(rowSet);
Supplier<Spliterator<SqlRow>> supplier = () -> Spliterators.spliteratorUnknownSize(new Iterator<SqlRow>() {
@Override
public boolean hasNext() {
return !rowSet.isLast();
}
@Override
public SqlRow next() {
if (!rowSet.next()) {
throw new NoSuchElementException();
}
return sqlRow;
}
}, Spliterator.CONCURRENT);
return StreamSupport.stream(supplier, Spliterator.CONCURRENT, true); //this boolean sets the stream as parallel
}
}
Run Code Online (Sandbox Code Playgroud)
我也试过用,typeQueryAsStream.parallel().forEach((type)但结果是一样的.
输出示例:
[ForkJoinPool.commonPool-worker-1] INFO TypeService - 使用field1保存类型:L6797和field2:P1433.
[ForkJoinPool.commonPool-worker-1] INFO TypeService - 使用field1保存类型:L6797和field2:P1433.
[main] INFO TypeService - 使用field1:L6797和field2:P1433保存类型.
[ForkJoinPool.commonPool-worker-1] INFO TypeService - 使用field1保存类型:L6797和field2:P1433.
好吧,看看你的代码,
final SqlRow sqlRow = new SqlRowAdapter(rowSet);
Supplier<Spliterator<SqlRow>> supplier = () -> Spliterators.spliteratorUnknownSize(new Iterator<SqlRow>() {
…
@Override
public SqlRow next() {
if (!rowSet.next()) {
throw new NoSuchElementException();
}
return sqlRow;
}
}, Spliterator.CONCURRENT);
Run Code Online (Sandbox Code Playgroud)
您每次都返回相同的对象.通过在调用时隐式修改此对象的状态,可以实现所需的效果rowSet.next().
当多个线程尝试同时访问该单个对象时,这显然无法工作.即使缓冲某些项目,将它们交给另一个线程也会造成麻烦.因此,一旦涉及有状态的中间操作,这样的干扰也会引起顺序流的问题,例如sorted或distinct.
假设这typeMapper.mapRow(row)将产生一个对其他数据项没有干扰的实际数据项,您应该将此步骤集成到流源中,以创建有效的流.
public Stream<TypeRecord> stream(TypeMapper typeMapper) throws SQLException {
SqlRowSet rowSet = new ResultSetWrappingSqlRowSet(preparedStatement.executeQuery());
SqlRow sqlRow = new SqlRowAdapter(rowSet);
Spliterator<TypeRecord> sp = new Spliterators.AbstractSpliterator<TypeRecord>(
Long.MAX_VALUE, Spliterator.CONCURRENT|Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super TypeRecord> action) {
if(!rowSet.next()) return false;
action.accept(typeMapper.mapRow(sqlRow));
return true;
}
};
return StreamSupport.stream(sp, true); //this boolean sets the stream as parallel
}
Run Code Online (Sandbox Code Playgroud)
请注意,对于很多用例,例如,实现a Spliterator比实现更简单Iterator(spliteratorUnknownSize无论如何都需要包装).此外,没有必要将此实例化封装到Supplier.
最后要注意的是,当前实现对于具有未知大小的流不能很好地执行,因为它会Long.MAX_VALUE像非常大的数字一样,忽略规范分配给它的"未知"语义.提供估计大小对并行性能非常有益,它实际上并不需要精确,当前的实现,即使是一个完全组成的数字,也就是说1000可能比正确使用Long.MAX_VALUE表示完全更好未知的大小.
| 归档时间: |
|
| 查看次数: |
298 次 |
| 最近记录: |