mem*_*und 8 java future executorservice
我想读取一个大文件,处理每一行并将结果插入数据库.我的目标是并行处理行,因为每个进程都是一个冗长的任务.因此,我希望一个线程继续读取,多个线程继续处理,并且一个线程继续以块的形式插入到db.
我把它分解如下:
1)按顺序逐行读取文件(简单)
2)将每一行发送到线程池(3个线程),因为处理是长时间运行的任务.在线程池忙时阻止进一步读取行.
3)将每个处理行从每个theadpool写入StringBuffer
4)监视缓冲区大小,并将结果以块的形式写入数据库(例如,每1000个条目)
ExecutorService executor = Executors.newFixedThreadPool(3);
StringBuffer sb = new StringBuffer();
String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
count.getAndIncrement();
Future<String> future = executor.submit(() -> {
return processor.process(line);
});
//PROBLEM: this blocks until the future returns
sb.append(future.get());
if (count.get() == 100) {
bufferChunk = sb;
count = new AtomicInteger(0);
sb = new StringBuffer();
databaseService.batchInsert(bufferChunk.toString());
}
}
Run Code Online (Sandbox Code Playgroud)
问题:
future.get() 将永远阻止读者,直到一个未来返回结果
缓冲区"监控"可能做得不对
可能我不是以正确的方式做这件事.但是我怎样才能做到这一点?
旁注:filesize大约是10GB,所以我不能先将整个文件读入内存来准备并行任务.
| 归档时间: |
|
| 查看次数: |
703 次 |
| 最近记录: |