ser*_*erg 362 java parallel-processing concurrency multithreading executorservice
我需要一次执行4个任务,如下所示:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
//...wait for completion somehow
Run Code Online (Sandbox Code Playgroud)
一旦完成所有内容,我该如何收到通知?现在我想不出比设置一些全局任务计数器更好的事情,并在每个任务结束时减少它,然后在无限循环中监视这个计数器变为0; 或获得一个Futures列表,并在无限循环监视器isDone中为所有这些.什么是更好的解决方案不涉及无限循环?
谢谢.
cle*_*tus 427
基本上ExecutorService
你打电话shutdown()
然后awaitTermination()
:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}
Run Code Online (Sandbox Code Playgroud)
Chs*_*y76 167
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
try {
latch.await();
} catch (InterruptedException E) {
// handle
}
Run Code Online (Sandbox Code Playgroud)
并在你的任务中(包含在try/finally中)
latch.countDown();
Run Code Online (Sandbox Code Playgroud)
sjl*_*lee 82
ExecutorService.invokeAll()
为你做到了.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
Run Code Online (Sandbox Code Playgroud)
rog*_*ack 44
您还可以使用期货清单:
List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
public Void call() throws IOException {
// do something
return null;
}
}));
Run Code Online (Sandbox Code Playgroud)
然后,当你想要加入所有这些时,它基本上相当于加入每个,(带来额外的好处,它将子线程的异常重新引发到main):
for(Future f: this.futures) { f.get(); }
Run Code Online (Sandbox Code Playgroud)
基本上诀窍是一次调用每个Future上的.get(),而不是无限循环调用isDone()on(all或each).因此,一旦最后一个线程完成,您就可以保证"继续"前进并经过此块.需要注意的是,由于.get()调用会重新引发异常,如果其中一个线程死掉,你可能会在其他线程完成之前从这里加注[为了避免这种情况,你可以catch ExecutionException
在get调用周围添加一个].另一个警告是它保留对所有线程的引用,所以如果它们有线程局部变量,它们将不会被收集,直到你通过这个块之后(尽管你可能能够解决这个问题,如果它成为一个问题,通过删除未来离开ArrayList).如果你想知道哪个"首先完成"/sf/answers/2231952061/
Ada*_*ker 32
在Java8中,您可以使用CompletableFuture执行此操作:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
Run Code Online (Sandbox Code Playgroud)
str*_*yba 25
只是我的两分钱.为了克服CountDownLatch
事先知道任务数量的要求,你可以用一种简单的旧方式来做Semaphore
.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
taskExecutor.execute(new MyTask());
numberOfTasks++;
}
try {
s.aquire(numberOfTasks);
...
Run Code Online (Sandbox Code Playgroud)
在你的任务中,只需s.release()
按你的意愿打电话latch.countDown();
Răz*_*scu 12
游戏有点晚了但是为了完成...
而不是"等待"所有任务完成,你可以考虑好莱坞原则,"不要打电话给我,我会打电话给你" - 当我完成时.我认为结果代码更优雅......
Guava提供了一些有趣的工具来实现这一目标.
一个例子 ::
将ExecutorService包装到ListeningExecutorService ::
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Run Code Online (Sandbox Code Playgroud)
提交一系列可执行的callables ::
for (Callable<Integer> callable : callables) {
ListenableFuture<Integer> lf = service.submit(callable);
// listenableFutures is a collection
listenableFutures.add(lf)
});
Run Code Online (Sandbox Code Playgroud)
现在必不可少的部分:
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
Run Code Online (Sandbox Code Playgroud)
附加一个回调给ListenableFuture,你可以用来在所有期货完成时收到通知::
Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
log.info("@@ finished processing {} elements", Iterables.size(result));
// do something with all the results
}
@Override
public void onFailure(Throwable t) {
log.info("@@ failed because of :: {}", t);
}
});
Run Code Online (Sandbox Code Playgroud)
这也提供了一个优点,您可以在处理完成后在一个地方收集所有结果...
更多信息在这里
请遵循以下方法之一.
submit
的ExecutorService
,并与阻止呼叫检查状态get()
上Future
所建议的对象Kiran
invokeAll()
在ExecutorService上使用shutdown, awaitTermination, shutdownNow
按正确顺序使用ThreadPoolExecutor的API相关的SE问题:
你可以将你的任务包装在另一个runnable中,它将发送通知:
taskExecutor.execute(new Runnable() {
public void run() {
taskStartedNotification();
new MyTask().run();
taskFinishedNotification();
}
});
Run Code Online (Sandbox Code Playgroud)
小智 5
这是两个选择,请稍稍混淆哪个是最好的选择。
选项1:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
Run Code Online (Sandbox Code Playgroud)
选项2:
ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
futures.add(es.submit(task));
}
for(Future<?> future : futures) {
try {
future.get();
}catch(Exception e){
// do logging and nothing else
}
}
es.shutdown();
Run Code Online (Sandbox Code Playgroud)
这里放future.get(); 在尝试捕获中是个好主意吧?
归档时间: |
|
查看次数: |
303114 次 |
最近记录: |