如何在关机后重用线程池

Sou*_*ast 44 java concurrency

我有一个包含超过7000万行的.csv文件,其中每行生成一个Runnable然后由threadpool执行.这个Runnable会在Mysql中插入一条记录.

更重要的是,我想记录一个用于定位的RandomAccessFile的csv文件的位置.该位置被写入文件.我想在线程池中的所有线程都完成时写入此记录.然后调用ThreadPoolExecutor.shutdown().但是当更多的线路出现时,我又需要一个线程池.如何重用当前的线程池而不是重新创建新的线程池.

代码如下:

public static boolean processPage() throws Exception {

    long pos = getPosition();
    long start = System.currentTimeMillis();
    raf.seek(pos);
    if(pos==0)
        raf.readLine();
    for (int i = 0; i < PAGESIZE; i++) {
        String lineStr = raf.readLine();
        if (lineStr == null)
            return false;
        String[] line = lineStr.split(",");
        final ExperienceLogDO log = CsvExperienceLog.generateLog(line);
        //System.out.println("userId: "+log.getUserId()%512);

        pool.execute(new Runnable(){
            public void run(){
                try {
                    experienceService.insertExperienceLog(log);
                } catch (BaseException e) {
                    e.printStackTrace();
                }
            }
        });

        long end = System.currentTimeMillis();
    }

    BufferedWriter resultWriter = new BufferedWriter(
            new OutputStreamWriter(new FileOutputStream(new File(
                    RESULT_FILENAME), true)));
    resultWriter.write("\n");
    resultWriter.write(String.valueOf(raf.getFilePointer()));
    resultWriter.close();
    long time = System.currentTimeMillis()-start;
    System.out.println(time);
    return true;
}
Run Code Online (Sandbox Code Playgroud)

谢谢 !

Bru*_*eis 42

文档中所述,您无法重用ExecutorService已关闭的文件.我建议不要采取任何解决方法,因为(a)它们可能无法在所有情况下按预期工作; (b)您可以使用标准课程达到您想要的效果.

你必须要么

  1. 实例化一个新的ExecutorService; 要么

  2. 不要终止ExecutorService.

第一个解决方案很容易实现,所以我不会详细说明.

对于第二个,由于您希望在所有提交的任务完成后执行操作,您可以查看ExecutorCompletionService并使用它.它包装了一个ExecutorService将进行线程管理的东西,但是runnables将被包装成能告诉ExecutorCompletionService它们什么时候完成的东西,所以它可以向你报告:

ExecutorService executor = ...;
ExecutorCompletionService ecs = new ExecutorCompletionService(executor);

for (int i = 0; i < totalTasks; i++) {
  ... ecs.submit(...); ...
}

for (int i = 0; i < totalTasks; i++) {
  ecs.take();
}
Run Code Online (Sandbox Code Playgroud)

take()上的方法ExecutorCompletionService将阻塞,直到任务完成(正常或突然).它将返回a Future,因此您可以根据需要检查结果.

我希望这可以帮助你,因为我没有完全理解你的问题.