geo*_*ley 186 java multithreading executorservice threadpool
等待ExecutorService完成所有任务的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的工作 - 每个核心一个.现在我的设置如下:
ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}
ComputeDTask实现runnable.这似乎正确执行任务,但代码崩溃wait()了IllegalMonitorStateException.这很奇怪,因为我玩了一些玩具示例,它似乎工作.  
uniquePhrases包含数万个元素.我应该使用其他方法吗?我正在寻找尽可能简单的东西
and*_*soj 205
最简单的方法是使用ExecutorService.invokeAll()哪种方法可以满足您的要求.用你的说法,你需要修改或包装ComputeDTask实现Callable<>,这可以给你更多的灵活性.可能在你的应用程序中有一个有意义的实现Callable.call(),但是这里有一种方法来包装它,如果不使用Executors.callable().
ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());
for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}
List<Future<Object>> answers = es.invokeAll(todo);
正如其他人指出的那样,您可以使用invokeAll()适当的超时版本.在这个例子中,answers将包含一堆Future将返回null 的s(参见定义Executors.callable().可能你想要做的是轻微的重构,这样你就可以得到一个有用的答案,或者对底层的引用ComputeDTask,但我可以从你的例子中说出来.
如果不清楚,请注意invokeAll()在完成所有任务之前不会返回.(即,如果被问到Future,你的answers收藏中的所有s 都将报告.isDone().)这可以避免所有手动关机,等待终止等...并且ExecutorService如果需要,允许你将这个整齐地重复使用多个周期.
有关SO的一些相关问题:
对于你的问题,这些都不是严格意义上的,但它们确实为人们的思考Executor/ ExecutorService应该如何使用提供了一些颜色.
NG.*_*NG. 57
如果要等待所有任务完成,请使用该shutdown方法而不是wait.然后跟着它awaitTermination.  
此外,您可以使用Runtime.availableProcessors获取硬件线程的数量,以便您可以正确初始化线程池.
seh*_*seh 48
如果等待完成中的所有任务ExecutorService并不是您的目标,而是等到特定批次的任务完成后,您可以使用CompletionService- 特别是,ExecutorCompletionService.
我们的想法是创建一个ExecutorCompletionService包裹你的Executor,提交一些已知数量的任务通过CompletionService,然后得出相同数量从结果的完成队列使用任一take()(其中块)或poll()(不).一旦您绘制了与您提交的任务相对应的所有预期结果,您就会知道它们已完成.
让我再说一次,因为界面并不明显:你必须知道你投入了多少东西CompletionService才能知道要抽出多少东西.这对于take()方法尤其重要:调用它一次太多,它会阻塞你的调用线程,直到其他一些线程提交另一个作业为止CompletionService.
有一些示例显示了如何CompletionService在Java Concurrency in Practice一书中使用.
mdm*_*dma 11
如果你想等待执行的服务执行完,调用shutdown(),然后,awaitTermination(单位,UNITTYPE) ,例如awaitTermination(1, MINUTE).ExecutorService不会阻止它自己的监视器,所以你不能使用wait等.
有几种方法。
您可以先调用ExecutorService.shutdown,然后调用ExecutorService.awaitTermination,它返回:
true如果此执行程序终止并且false在终止之前超时已过
所以:
有一个函数叫
awaitTerminationBut a timeout ,里面必须提供。这并不能保证返回时所有任务都已完成。有办法实现这一点吗?
您只需awaitTermination循环调用即可。
使用awaitTermination:
此实现的完整示例:
public class WaitForAllToEnd {
    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }
        int count = 0;
        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }
    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}
使用CountDownLatch:
另一种选择是创建一个等于并行任务数量的CountDownLatch 。count每个线程调用countDownLatch.countDown();,而主线程调用countDownLatch.await();。
此实现的完整示例:
public class WaitForAllToEnd {
    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }
    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}
使用CyclicBarrier:
另一种方法是使用循环屏障
public class WaitForAllToEnd {
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }
    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}
还有其他方法,但这些方法需要更改您的初始要求,即:
使用 ExecutorService.execute() 提交任务时如何等待所有任务完成。
您可以等待某个时间间隔完成作业:
int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}
或者您可以使用ExecutorService.提交(Runnable)并收集它返回的Future对象并依次调用get()以等待它们完成.
ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}
InterruptedException对于正确处理非常重要.它可以让您或您的库的用户安全地终止长时间的过程.
小智 6
只是用
latch = new CountDownLatch(noThreads)
在每个线程中
latch.countDown();
并作为障碍
latch.await();
IllegalMonitorStateException的根本原因:
抛出此异常表示线程已尝试在对象的监视器上等待,或者在没有指定监视器的情况下通知在对象监视器上等待的其他线程.
从您的代码中,您刚刚在ExecutorService上调用了wait()而没有拥有锁.
下面的代码将修复 IllegalMonitorStateException
try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 
按照以下方法之一等待完成已提交的所有任务ExecutorService.
通过所有迭代Future任务从submit上ExecutorService与阻塞调用检查状态get()的Future对象
使用的invokeAll上ExecutorService 
使用ForkJoinPool或newWorkStealingPool的Executors(由于Java 8)
按照oracle文档页面中的建议关闭池
void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
   // Wait a while for existing tasks to terminate
   if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
       System.err.println("Pool did not terminate");
   }
} catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
}
如果要在使用选项5而不是选项1到4时优雅地等待所有任务完成,请更改
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
至
一个while(condition)它检查每1分钟.
您可以使用ExecutorService.invokeAll方法,它将执行所有任务,并等待所有线程完成其任务。
这是完整的javadoc
您还可以使用此方法的用户重载版本来指定超时。
这是带有的示例代码 ExecutorService.invokeAll
public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }
}
class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}
class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}
| 归档时间: | 
 | 
| 查看次数: | 180388 次 | 
| 最近记录: |