Tra*_*svi 4 java parallel-processing
我需要用Java编写一个程序,它将读取目录树中相对较多的(~50,000)个文件,处理数据,并在单独的(平面)目录中输出处理过的数据.
目前我有这样的事情:
private void crawlDirectoyAndProcessFiles(File directory) {
for (File file : directory.listFiles()) {
if (file.isDirectory()) {
crawlDirectoyAndProcessFiles(file);
} else {
Data d = readFile(file);
ProcessedData p = d.process();
writeFile(p,file.getAbsolutePath(),outputDir);
}
}
}
Run Code Online (Sandbox Code Playgroud)
可以说,为了便于阅读,每个方法都被删除和修剪,但它们都可以正常工作.整个过程工作正常,但速度很慢.数据处理通过远程服务进行,需要5-15秒.乘以50,000 ...
我之前从未做过任何多线程的事情,但我认为如果我这样做,我可以获得一些非常好的速度提升.任何人都可以指出我如何有效地并行化这种方法?
Ted*_*opp 11
我会使用ThreadPoolExecutor来管理线程.你可以这样做:
private class Processor implements Runnable {
private final File file;
public Processor(File file) {
this.file = file;
}
@Override
public void run() {
Data d = readFile(file);
ProcessedData p = d.process();
writeFile(p,file.getAbsolutePath(),outputDir);
}
}
private void crawlDirectoryAndProcessFiles(File directory, Executor executor) {
for (File file : directory.listFiles()) {
if (file.isDirectory()) {
crawlDirectoryAndProcessFiles(file,executor);
} else {
executor.execute(new Processor(file);
}
}
}
Run Code Online (Sandbox Code Playgroud)
您将使用以下方式获得Executor:
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
Run Code Online (Sandbox Code Playgroud)
这里poolSize是你想要去一次的最大线程数.(这里有一个合理的数字很重要; 50,000个线程并不是一个好主意.一个合理的数字可能是8.)请注意,在排队所有文件之后,你的主线程可以等到通过调用完成任务executor.awaitTermination.
假设您有一个硬盘(即只允许单个同时读取操作,而不是SSD或RAID阵列,网络文件系统等...),那么您只需要一个线程执行IO(读取/写入磁盘).此外,您只需要与拥有内核一样多的线程执行CPU绑定操作,否则将浪费时间在上下文切换中.
鉴于上述限制,下面的代码应该适合您.单线程执行程序确保一次只Runnable执行一个.固定线程池确保NUM_CPUS Runnable在任何时候都不会执行s.
这样做的一件事是提供有关何时完成处理的反馈.
private final static int NUM_CPUS = 4;
private final Executor _fileReaderWriter = Executors.newSingleThreadExecutor();
private final Executor _fileProcessor = Executors.newFixedThreadPool(NUM_CPUS);
private final class Data {}
private final class ProcessedData {}
private final class FileReader implements Runnable
{
private final File _file;
FileReader(final File file) { _file = file; }
@Override public void run()
{
final Data data = readFile(_file);
_fileProcessor.execute(new FileProcessor(_file, data));
}
private Data readFile(File file) { /* ... */ return null; }
}
private final class FileProcessor implements Runnable
{
private final File _file;
private final Data _data;
FileProcessor(final File file, final Data data) { _file = file; _data = data; }
@Override public void run()
{
final ProcessedData processedData = processData(_data);
_fileReaderWriter.execute(new FileWriter(_file, processedData));
}
private ProcessedData processData(final Data data) { /* ... */ return null; }
}
private final class FileWriter implements Runnable
{
private final File _file;
private final ProcessedData _data;
FileWriter(final File file, final ProcessedData data) { _file = file; _data = data; }
@Override public void run()
{
writeFile(_file, _data);
}
private Data writeFile(final File file, final ProcessedData data) { /* ... */ return null; }
}
public void process(final File file)
{
if (file.isDirectory())
{
for (final File subFile : file.listFiles())
process(subFile);
}
else
{
_fileReaderWriter.execute(new FileReader(file));
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
16821 次 |
| 最近记录: |