如何使用ExecutorService等待所有线程完成?

ser*_*erg 362 java parallel-processing concurrency multithreading executorservice

我需要一次执行4个任务,如下所示:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow
Run Code Online (Sandbox Code Playgroud)

一旦完成所有内容,我该如何收到通知?现在我想不出比设置一些全局任务计数器更好的事情,并在每个任务结束时减少它,然后在无限循环中监视这个计数器变为0; 或获得一个Futures列表,并在无限循环监视器isDone中为所有这些.什么是更好的解决方案不涉及无限循环?

谢谢.

cle*_*tus 427

基本上ExecutorService你打电话shutdown()然后awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}
Run Code Online (Sandbox Code Playgroud)

  • 我正在寻找任何官方文档,"Long.MAX_VALUE,TimeUnit.NANOSECONDS"相当于没有超时. (42认同)
  • 如果此任务处理是一次性事件,那么这是一个很好的模式.但是,如果在同一运行时重复执行此操作,则不是最佳选择,因为每次执行时都会重复创建和拆除线程. (27认同)
  • @SamHarwell在`Timing下看``java.util.concurrent`包[documentation](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html) `section:*要等待"forever",你可以使用`Long.MAX_VALUE`*的值 (17认同)
  • 我不敢相信你必须使用shutdown才能加入所有当前线程(在使用shutdown之后,你不能再次使用执行程序).建议使用Future的列表代替...... (14认同)
  • 这正是shutdown/awaitTermination的意思 (7认同)
  • IMO,没有超时可能不是明智之举.你的任务总是有可能卡住/死锁,用户会永远等待. (4认同)
  • @Kejia 这不是不完整的......这样想:你的主线程正在循环中产生子线程。添加完成后,运行此代码。**您可能会看到输出乱序**,但您的所有任务都将被分拆。谢谢@Cletus! (2认同)
  • 当我对此进行测试时,我发现执行程序在所有任务完成之前就完成了。如果我在调用 shutdown() 之前使用 Thread.sleep(100) 它工作。因此,我怀疑如果将任务添加到关闭调用之间的时间太短,您的代码可能存在一些计时问题。 (2认同)
  • 如果您不想关闭执行程序,这将无法工作.对于一个永久运行的批次类的东西,你需要提交工作并等待它们完成才能继续前进.在这种情况下,闩锁或屏障比关机更有意义. (2认同)
  • @SamHarwell `TimeUnit.NANOSECONDS` 和 `Long.MAX_VALUE` 等于 106,751 天或 292 年(`TimeUnit.NANOSECONDS.toDays(Long.MAX_VALUE);`),这应该足够了,或者使用一些更大的 TimeUnit。 (2认同)
  • executor.shutdown(); 正在停止我的程序! (2认同)

Chs*_*y76 167

使用CountDownLatch:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}
Run Code Online (Sandbox Code Playgroud)

并在你的任务中(包含在try/finally中)

latch.countDown();
Run Code Online (Sandbox Code Playgroud)

  • @cletus - 然后你不使用CountDownLatch :-)请注意,我并不是说这种方法比你的方法更好.但是,我发现在现实生活中,我知道任务的数量,每个部署需要配置线程池设置_do_,并且可以重用池_can_.所以我通常有Spring注入的线程池并将它们设置为原型并手动关闭它们****只是为了等待线程完成似乎不太理想. (11认同)
  • 没有4个任务.一次完成4个"一些任务". (3认同)
  • 我发现这个解决方案比其他解决方案更优雅,它看起来就像是为了这个目的而制作的,它简单明了. (3认同)
  • 如果你在开始之前不知道任务的数量怎么办? (3认同)
  • 对不起,我误解了这个问题.是的,任务数应该是CountDownLatch构造函数的参数 (2认同)

sjl*_*lee 82

ExecutorService.invokeAll() 为你做到了.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
Run Code Online (Sandbox Code Playgroud)

  • @从JavaDoc中(在答题链接)AlikElzin-kilaka引用:"执行给定的任务,返回保持状态期货和结果列表当所有完整的Future.isDone()的返回列表中的每个元素真的. " (5认同)
  • 此方法仅在您事先了解任务数量时才有效. (3认同)
  • 我认为当'期货'被退回时,任务还没有完成.它们可能在将来完成,您将获得结果的链接.这就是为什么它被称为"未来".你有方法[Future.get()](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html#get--),它将等待任务完成得到一个结果. (3认同)
  • 请注意, executorService.invokeAll 将等待所有线程完成,但您仍需要调用 executorService.shutdown 来清理您的线程池。 (2认同)

rog*_*ack 44

您还可以使用期货清单:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));
Run Code Online (Sandbox Code Playgroud)

然后,当你想要加入所有这些时,它基本上相当于加入每个,(带来额外的好处,它将子线程的异常重新引发到main):

for(Future f: this.futures) { f.get(); }
Run Code Online (Sandbox Code Playgroud)

基本上诀窍是一次调用每个Future上的.get(),而不是无限循环调用isDone()on(all或each).因此,一旦最后一个线程完成,您就可以保证"继续"前进并经过此块.需要注意的是,由于.get()调用会重新引发异常,如果其中一个线程死掉,你可能会在其他线程完成之前从这里加注[为了避免这种情况,你可以catch ExecutionException在get调用周围添加一个].另一个警告是它保留对所有线程的引用,所以如果它们有线程局部变量,它们将不会被收集,直到你通过这个块之后(尽管你可能能够解决这个问题,如果它成为一个问题,通过删除未来离开ArrayList).如果你想知道哪个"首先完成"/sf/answers/2231952061/

  • 要知道哪个"先完成",请使用`ExecutorCompletionService.take`:http://stackoverflow.com/a/11872604/199364 (3认同)

Ada*_*ker 32

在Java8中,您可以使用CompletableFuture执行此操作:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
Run Code Online (Sandbox Code Playgroud)

  • 这是一个非常优雅的解决方案。 (4认同)

str*_*yba 25

只是我的两分钱.为了克服CountDownLatch事先知道任务数量的要求,你可以用一种简单的旧方式来做Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...
Run Code Online (Sandbox Code Playgroud)

在你的任务中,只需s.release()按你的意愿打电话latch.countDown();


Pek*_*erg 12

Java 5及更高版本中的CyclicBarrier类是为此类设计的.

  • 很酷,永远不会记住这个数据结构的名称.但是,只有事先知道将排队的任务量才适用. (7认同)

Răz*_*scu 12

游戏有点晚了但是为了完成...

而不是"等待"所有任务完成,你可以考虑好莱坞原则,"不要打电话给我,我会打电话给你" - 当我完成时.我认为结果代码更优雅......

Guava提供了一些有趣的工具来实现这一目标.

一个例子 ::

将ExecutorService包装到ListeningExecutorService ::

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Run Code Online (Sandbox Code Playgroud)

提交一系列可执行的callables ::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});
Run Code Online (Sandbox Code Playgroud)

现在必不可少的部分:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
Run Code Online (Sandbox Code Playgroud)

附加一个回调给ListenableFuture,你可以用来在所有期货完成时收到通知::

        Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
        @Override
        public void onSuccess(List<Integer> result) {
            log.info("@@ finished processing {} elements", Iterables.size(result));
            // do something with all the results
        }

        @Override
        public void onFailure(Throwable t) {
            log.info("@@ failed because of :: {}", t);
        }
    });
Run Code Online (Sandbox Code Playgroud)

这也提供了一个优点,您可以在处理完成后在一个地方收集所有结果...

更多信息在这里

  • 很干净。即使在Android上也可以正常工作。只是必须在onSuccess()中使用`runOnUiThread()`。 (2认同)

Rav*_*abu 7

请遵循以下方法之一.

  1. 通过迭代所有未来的任务,从返回submitExecutorService,并与阻止呼叫检查状态get()Future所建议的对象Kiran
  2. invokeAll()ExecutorService上使用
  3. CountDownLatch
  4. ForkJoinPoolExecutors.html#newWorkStealingPool
  5. shutdown, awaitTermination, shutdownNow按正确顺序使用ThreadPoolExecutor的API

相关的SE问题:

CountDownLatch如何在Java多线程中使用?

如何正确关闭java ExecutorService


Zed*_*Zed 5

你可以将你的任务包装在另一个runnable中,它将发送通知:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});
Run Code Online (Sandbox Code Playgroud)


小智 5

这是两个选择,请稍稍混淆哪个是最好的选择。

选项1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
Run Code Online (Sandbox Code Playgroud)

选项2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();
Run Code Online (Sandbox Code Playgroud)

这里放future.get(); 在尝试捕获中是个好主意吧?