Ed *_*aty 5 java concurrency multithreading
我想用一个CompletionService来处理来自一系列线程的结果,因为他们完成.我有一个循环中的服务来获取它提供的Future对象,但我不知道确定所有线程何时完成(从而退出循环)的最佳方法:
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class Bar {
final static int MAX_THREADS = 4;
final static int TOTAL_THREADS = 20;
public static void main(String[] args) throws Exception{
final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);
for (int i=0; i<TOTAL_THREADS; i++){
service.submit(new MyCallable(i));
}
int finished = 0;
Future<Integer> future = null;
do{
future = service.take();
int result = future.get();
System.out.println(" took: " + result);
finished++;
}while(finished < TOTAL_THREADS);
System.out.println("Shutting down");
threadPool.shutdown();
}
public static class MyCallable implements Callable<Integer>{
final int id;
public MyCallable(int id){
this.id = id;
System.out.println("Submitting: " + id);
}
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
System.out.println("finished: " + id);
return id;
}
}
}
Run Code Online (Sandbox Code Playgroud)
我已经尝试检查ThreadPoolExecutor的状态,但我知道getCompletedTaskCount和getTaskCount方法只是近似值,不应该依赖它们.有没有更好的方法来确保我从CompletionService中检索所有Futures而不是自己计算它们?
编辑:Nobeh提供的链接,以及此链接建议计算提交的任务数,然后多次调用take(),这是要走的路.我很惊讶没有办法向CompletionService或其Executor询问还剩下什么.
请参阅http://www.javaspecialists.eu/archive/Issue214.html以获取有关如何扩展 ExecutorCompletionService 以执行您正在寻找的操作的体面建议。为方便起见,我已在下面粘贴了相关代码。作者还建议让服务实现 Iterable,我认为这是一个好主意。
FWIW,我同意你的看法,这确实应该成为标准实施的一部分,但可惜,事实并非如此。
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class CountingCompletionService<V> extends ExecutorCompletionService<V> {
private final AtomicLong submittedTasks = new AtomicLong();
private final AtomicLong completedTasks = new AtomicLong();
public CountingCompletionService(Executor executor) {
super(executor);
}
public CountingCompletionService(
Executor executor, BlockingQueue<Future<V>> queue) {
super(executor, queue);
}
public Future<V> submit(Callable<V> task) {
Future<V> future = super.submit(task);
submittedTasks.incrementAndGet();
return future;
}
public Future<V> submit(Runnable task, V result) {
Future<V> future = super.submit(task, result);
submittedTasks.incrementAndGet();
return future;
}
public Future<V> take() throws InterruptedException {
Future<V> future = super.take();
completedTasks.incrementAndGet();
return future;
}
public Future<V> poll() {
Future<V> future = super.poll();
if (future != null) completedTasks.incrementAndGet();
return future;
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
Future<V> future = super.poll(timeout, unit);
if (future != null) completedTasks.incrementAndGet();
return future;
}
public long getNumberOfCompletedTasks() {
return completedTasks.get();
}
public long getNumberOfSubmittedTasks() {
return submittedTasks.get();
}
public boolean hasUncompletedTasks() {
return completedTasks.get() < submittedTasks.get();
}
}
Run Code Online (Sandbox Code Playgroud)
回答这些问题你就能得到答案吗?
CompletionService?service唯一应该处理应用程序中创建的任务的对象吗?基于参考文档,CompletionService根据消费者/生产者方法采取行动并利用内部Executor. 所以,只要你在一个地方生产任务,在另一个地方消费任务,CompletionService.take()就表示是否还有结果可以给出。
我相信这个问题也对你有帮助。
| 归档时间: |
|
| 查看次数: |
7498 次 |
| 最近记录: |