并行读写多个文件

Tra*_*svi 4 java parallel-processing

我需要用Java编写一个程序,它将读取目录树中相对较多的(~50,000)个文件,处理数据,并在单独的(平面)目录中输出处理过的数据.

目前我有这样的事情:

private void crawlDirectoyAndProcessFiles(File directory) {
  for (File file : directory.listFiles()) {
    if (file.isDirectory()) {
      crawlDirectoyAndProcessFiles(file);
    } else { 
      Data d = readFile(file);
      ProcessedData p = d.process();
      writeFile(p,file.getAbsolutePath(),outputDir);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

可以说,为了便于阅读,每个方法都被删除和修剪,但它们都可以正常工作.整个过程工作正常,但速度很慢.数据处理通过远程服务进行,需要5-15秒.乘以50,000 ...

我之前从未做过任何多线程的事情,但我认为如果我这样做,我可以获得一些非常好的速度提升.任何人都可以指出我如何有效地并行化这种方法?

Ted*_*opp 11

我会使用ThreadPoolExecutor来管理线程.你可以这样做:

private class Processor implements Runnable {
    private final File file;

    public Processor(File file) {
        this.file = file;
    }

    @Override
    public void run() {
        Data d = readFile(file);
        ProcessedData p = d.process();
        writeFile(p,file.getAbsolutePath(),outputDir);
    }
}

private void crawlDirectoryAndProcessFiles(File directory, Executor executor) {
    for (File file : directory.listFiles()) {
        if (file.isDirectory()) {
          crawlDirectoryAndProcessFiles(file,executor);
        } else {
            executor.execute(new Processor(file); 
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

您将使用以下方式获得Executor:

ExecutorService executor = Executors.newFixedThreadPool(poolSize);
Run Code Online (Sandbox Code Playgroud)

这里poolSize是你想要去一次的最大线程数.(这里有一个合理的数字很重要; 50,000个线程并不是一个好主意.一个合理的数字可能是8.)请注意,在排队所有文件之后,你的主线程可以等到通过调用完成任务executor.awaitTermination.


Sim*_*onC 6

假设您有一个硬盘(即只允许单个同时读取操作,而不是SSD或RAID阵列,网络文件系统等...),那么您只需要一个线程执行IO(读取/写入磁盘).此外,您只需要与拥有内核一样多的线程执行CPU绑定操作,否则将浪费时间在上下文切换中.

鉴于上述限制,下面的代码应该适合您.单线程执行程序确保一次只Runnable执行一个.固定线程池确保NUM_CPUS Runnable在任何时候都不会执行s.

这样做的一件事是提供有关何时完成处理的反馈.

private final static int NUM_CPUS = 4;

private final Executor _fileReaderWriter = Executors.newSingleThreadExecutor();
private final Executor _fileProcessor = Executors.newFixedThreadPool(NUM_CPUS);

private final class Data {}
private final class ProcessedData {}

private final class FileReader implements Runnable
{
  private final File _file;
  FileReader(final File file) { _file = file; }
  @Override public void run() 
  { 
    final Data data = readFile(_file);
    _fileProcessor.execute(new FileProcessor(_file, data));
  }

  private Data readFile(File file) { /* ... */ return null; }    
}

private final class FileProcessor implements Runnable
{
  private final File _file;
  private final Data _data;
  FileProcessor(final File file, final Data data) { _file = file; _data = data; }
  @Override public void run() 
  { 
    final ProcessedData processedData = processData(_data);
    _fileReaderWriter.execute(new FileWriter(_file, processedData));
  }

  private ProcessedData processData(final Data data) { /* ... */ return null; }
}

private final class FileWriter implements Runnable
{
  private final File _file;
  private final ProcessedData _data;
  FileWriter(final File file, final ProcessedData data) { _file = file; _data = data; }
  @Override public void run() 
  { 
    writeFile(_file, _data);
  }

  private Data writeFile(final File file, final ProcessedData data) { /* ... */ return null; }
}

public void process(final File file)   
{ 
  if (file.isDirectory())
  {
    for (final File subFile : file.listFiles())
      process(subFile);
  }
  else
  {
    _fileReaderWriter.execute(new FileReader(file));
  }
}
Run Code Online (Sandbox Code Playgroud)