如何并行处理文件的行?

mem*_*und 8 java future executorservice

我想读取一个大文件,处理每一行并将结果插入数据库.我的目标是并行处理行,因为每个进程都是一个冗长的任务.因此,我希望一个线程继续读取,多个线程继续处理,并且一个线程继续以块的形式插入到db.

我把它分解如下:

1)按顺序逐行读取文件(简单)

2)将每一行发送到线程池(3个线程),因为处理是长时间运行的任务.在线程池忙时阻止进一步读取行.

3)将每个处理行从每个theadpool写入StringBuffer

4)监视缓冲区大小,并将结果以块的形式写入数据库(例如,每1000个条目)

ExecutorService executor = Executors.newFixedThreadPool(3);

StringBuffer sb = new StringBuffer();

String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
    count.getAndIncrement();
    Future<String> future = executor.submit(() -> {
        return processor.process(line);
    });

    //PROBLEM: this blocks until the future returns
    sb.append(future.get());

    if (count.get() == 100) {
        bufferChunk = sb;
        count = new AtomicInteger(0);
        sb = new StringBuffer();

        databaseService.batchInsert(bufferChunk.toString());
    }
}
Run Code Online (Sandbox Code Playgroud)

问题:

  • future.get() 将永远阻止读者,直到一个未来返回结果

  • 缓冲区"监控"可能做得不对

可能我不是以正确的方式做这件事.但是我怎样才能做到这一点?

旁注:filesize大约是10GB,所以我不能先将整个文件读入内存来准备并行任务.

gio*_*_go 0

  1. 读取文件大小。(File.length() 方法)并将其拆分为您的线程数。
  2. 使用 RandomAccessFile 搜索在 @1 处找到的索引之前的所有换行符。https://docs.oracle.com/javase/7/docs/api/java/io/RandomAccessFile.html
  3. 向每个线程发送新的索引/偏移量 + RandomAccessFile,并对每个线程进行读访问。
  4. 子类化 InputStream 以在 RandomAccessFile 之上创建一个新的 InputStream 并开始读取。