ExecutorService,如何等待所有任务完成

geo*_*ley 186 java multithreading executorservice threadpool

等待ExecutorService完成所有任务的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的工作 - 每个核心一个.现在我的设置如下:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)

ComputeDTask实现runnable.这似乎正确执行任务,但代码崩溃wait()IllegalMonitorStateException.这很奇怪,因为我玩了一些玩具示例,它似乎工作.

uniquePhrases包含数万个元素.我应该使用其他方法吗?我正在寻找尽可能简单的东西

and*_*soj 205

最简单的方法是使用ExecutorService.invokeAll()哪种方法可以满足您的要求.用你的说法,你需要修改或包装ComputeDTask实现Callable<>,这可以给你更多的灵活性.可能在你的应用程序中有一个有意义的实现Callable.call(),但是这里有一种方法来包装它,如果不使用Executors.callable().

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);
Run Code Online (Sandbox Code Playgroud)

正如其他人指出的那样,您可以使用invokeAll()适当的超时版本.在这个例子中,answers将包含一堆Future将返回null 的s(参见定义Executors.callable().可能你想要做的是轻微的重构,这样你就可以得到一个有用的答案,或者对底层的引用ComputeDTask,但我可以从你的例子中说出来.

如果不清楚,请注意invokeAll()在完成所有任务之前不会返回.(即,如果被问到Future,你的answers收藏中的所有s 都将报告.isDone().)这可以避免所有手动关机,等待终止等...并且ExecutorService如果需要,允许你将这个整齐地重复使用多个周期.

有关SO的一些相关问题:

对于你的问题,这些都不是严格意义上的,但它们确实为人们的思考Executor/ ExecutorService应该如何使用提供了一些颜色.

  • 如果您在批处理中添加所有作业并且挂在Callables列表中,这是完美的,但如果您在回调或事件循环情况下调用ExecutorService.submit(),它将无法工作. (9认同)
  • Downvoter:好奇您的想法是错误的。 (2认同)
  • 我认为值得一提的是,当不再需要ExecutorService时仍应调用shutdown(),否则线程永远不会终止(除了corePoolSize = 0或allowCoreThreadTimeOut = true的情况). (2认同)

NG.*_*NG. 57

如果要等待所有任务完成,请使用该shutdown方法而不是wait.然后跟着它awaitTermination.

此外,您可以使用Runtime.availableProcessors获取硬件线程的数量,以便您可以正确初始化线程池.

  • shutdown()停止ExecutorService接受新任务并关闭空闲工作线程.未指定等待关闭完成,并且ThreadPoolExecutor中的实现不等待. (21认同)
  • 如果要完成某项任务,必须安排更多任务,该怎么办?例如,您可以进行多线程树遍历,将分支交给工作线程.在这种情况下,由于ExecutorService立即关闭,因此无法接受任何递归调度的作业. (4认同)
  • `awaitTermination` 需要超时时间作为参数。虽然可以提供有限的时间并在它周围放置一个循环以等待所有线程完成,但我想知道是否有更优雅的解决方案。 (2认同)
  • 你是对的,但是看到这个答案 - http://stackoverflow.com/a/1250655/263895 - 你总是可以给它一个令人难以置信的长超时 (2认同)

seh*_*seh 48

如果等待完成中的所有任务ExecutorService并不是您的目标,而是等到特定批次的任务完成后,您可以使用CompletionService- 特别是,ExecutorCompletionService.

我们的想法是创建一个ExecutorCompletionService包裹你的Executor,提交一些已知数量的任务通过CompletionService,然后得出相同数量从结果的完成队列使用任一take()(其中块)或poll()(不).一旦您绘制了与您提交的任务相对应的所有预期结果,您就会知道它们已完成.

让我再说一次,因为界面并不明显:你必须知道你投入了多少东西CompletionService才能知道要抽出多少东西.这对于take()方法尤其重要:调用它一次太多,它会阻塞你的调用线程,直到其他一些线程提交另一个作业为止CompletionService.

一些示例显示了如何CompletionServiceJava Concurrency in Practice一书中使用.


mdm*_*dma 11

如果你想等待执行的服务执行完,调用shutdown(),然后,awaitTermination(单位,UNITTYPE) ,例如awaitTermination(1, MINUTE).ExecutorService不会阻止它自己的监视器,所以你不能使用wait等.


dre*_*ash 8

有几种方法。

您可以先调用ExecutorService.shutdown,然后调用ExecutorService.awaitTermination,它返回:

true如果此执行程序终止并且false在终止之前超时已过

所以:

有一个函数叫awaitTerminationBut a timeout ,里面必须提供。这并不能保证返回时所有任务都已完成。有办法实现这一点吗?

您只需awaitTermination循环调用即可。

使用awaitTermination

此实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}
Run Code Online (Sandbox Code Playgroud)

使用CountDownLatch

另一种选择是创建一个等于并行任务数量的CountDownLatch 。count每个线程调用countDownLatch.countDown();,而线程调用countDownLatch.await();

此实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}
Run Code Online (Sandbox Code Playgroud)

使用CyclicBarrier

另一种方法是使用循环屏障

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}
Run Code Online (Sandbox Code Playgroud)

还有其他方法,但这些方法需要更改您的初始要求,即:

使用 ExecutorService.execute() 提交任务时如何等待所有任务完成。


Ala*_*Dea 7

您可以等待某个时间间隔完成作业:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}
Run Code Online (Sandbox Code Playgroud)

或者您可以使用ExecutorService.提交(Runnable)并收集它返回的Future对象并依次调用get()以等待它们完成.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}
Run Code Online (Sandbox Code Playgroud)

InterruptedException对于正确处理非常重要.它可以让您或您的库的用户安全地终止长时间的过程.


小智 6

只是用

latch = new CountDownLatch(noThreads)
Run Code Online (Sandbox Code Playgroud)

在每个线程中

latch.countDown();
Run Code Online (Sandbox Code Playgroud)

并作为障碍

latch.await();
Run Code Online (Sandbox Code Playgroud)


Rav*_*abu 6

IllegalMonitorStateException的根本原因:

抛出此异常表示线程已尝试在对象的监视器上等待,或者在没有指定监视器的情况下通知在对象监视器上等待的其他线程.

从您的代码中,您刚刚在ExecutorService上调用了wait()而没有拥有锁.

下面的代码将修复 IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 
Run Code Online (Sandbox Code Playgroud)

按照以下方法之一等待完成已提交的所有任务ExecutorService.

  1. 通过所有迭代Future任务从submitExecutorService与阻塞调用检查状态get()Future对象

  2. 使用的invokeAllExecutorService

  3. 使用CountDownLatch

  4. 使用ForkJoinPoolnewWorkStealingPoolExecutors(由于Java 8)

  5. 按照oracle文档页面中的建议关闭池

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    
    Run Code Online (Sandbox Code Playgroud)

    如果要在使用选项5而不是选项1到4时优雅地等待所有任务完成,请更改

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    
    Run Code Online (Sandbox Code Playgroud)

    一个while(condition)它检查每1分钟.


Nit*_*iya 5

您可以使用ExecutorService.invokeAll方法,它将执行所有任务,并等待所有线程完成其任务。

这是完整的javadoc

您还可以使用此方法的用户重载版本来指定超时。

这是带有的示例代码 ExecutorService.invokeAll

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}
Run Code Online (Sandbox Code Playgroud)