我有一个包含超过7000万行的.csv文件,其中每行生成一个Runnable然后由threadpool执行.这个Runnable会在Mysql中插入一条记录.
更重要的是,我想记录一个用于定位的RandomAccessFile的csv文件的位置.该位置被写入文件.我想在线程池中的所有线程都完成时写入此记录.然后调用ThreadPoolExecutor.shutdown().但是当更多的线路出现时,我又需要一个线程池.如何重用当前的线程池而不是重新创建新的线程池.
代码如下:
public static boolean processPage() throws Exception {
long pos = getPosition();
long start = System.currentTimeMillis();
raf.seek(pos);
if(pos==0)
raf.readLine();
for (int i = 0; i < PAGESIZE; i++) {
String lineStr = raf.readLine();
if (lineStr == null)
return false;
String[] line = lineStr.split(",");
final ExperienceLogDO log = CsvExperienceLog.generateLog(line);
//System.out.println("userId: "+log.getUserId()%512);
pool.execute(new Runnable(){
public void run(){
try {
experienceService.insertExperienceLog(log);
} catch (BaseException e) {
e.printStackTrace();
}
}
});
long end = System.currentTimeMillis();
}
BufferedWriter resultWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(new File(
RESULT_FILENAME), true)));
resultWriter.write("\n");
resultWriter.write(String.valueOf(raf.getFilePointer()));
resultWriter.close();
long time = System.currentTimeMillis()-start;
System.out.println(time);
return true;
}
Run Code Online (Sandbox Code Playgroud)
谢谢 !
Bru*_*eis 42
如文档中所述,您无法重用ExecutorService已关闭的文件.我建议不要采取任何解决方法,因为(a)它们可能无法在所有情况下按预期工作; (b)您可以使用标准课程达到您想要的效果.
你必须要么
实例化一个新的ExecutorService; 要么
不要终止ExecutorService.
第一个解决方案很容易实现,所以我不会详细说明.
对于第二个,由于您希望在所有提交的任务完成后执行操作,您可以查看ExecutorCompletionService并使用它.它包装了一个ExecutorService将进行线程管理的东西,但是runnables将被包装成能告诉ExecutorCompletionService它们什么时候完成的东西,所以它可以向你报告:
ExecutorService executor = ...;
ExecutorCompletionService ecs = new ExecutorCompletionService(executor);
for (int i = 0; i < totalTasks; i++) {
... ecs.submit(...); ...
}
for (int i = 0; i < totalTasks; i++) {
ecs.take();
}
Run Code Online (Sandbox Code Playgroud)
类take()上的方法ExecutorCompletionService将阻塞,直到任务完成(正常或突然).它将返回a Future,因此您可以根据需要检查结果.
我希望这可以帮助你,因为我没有完全理解你的问题.
| 归档时间: |
|
| 查看次数: |
24805 次 |
| 最近记录: |