如何知道CompletionService何时完成交付结果?

Ed *_*aty 5 java concurrency multithreading

我想用一个CompletionService来处理来自一系列线程的结果,因为他们完成.我有一个循环中的服务来获取它提供的Future对象,但我不知道确定所有线程何时完成(从而退出循环)的最佳方法:

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class Bar {

    final static int MAX_THREADS = 4;
    final static int TOTAL_THREADS = 20;

    public static void main(String[] args) throws Exception{

        final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
        final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);

        for (int i=0; i<TOTAL_THREADS; i++){
            service.submit(new MyCallable(i));
        }

        int finished = 0;
        Future<Integer> future = null;
        do{
            future = service.take();
            int result = future.get();
            System.out.println("  took: " + result);
            finished++;             

        }while(finished < TOTAL_THREADS);

        System.out.println("Shutting down");
        threadPool.shutdown();
    }


    public static class MyCallable implements Callable<Integer>{

        final int id;

        public MyCallable(int id){
            this.id = id;
            System.out.println("Submitting: " + id);
        }

        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println("finished: " + id);
            return id;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我已经尝试检查ThreadPoolExecutor的状态,但我知道getCompletedTaskCount和getTaskCount方法只是近似值,不应该依赖它们.有没有更好的方法来确保我从CompletionService中检索所有Futures而不是自己计算它们?


编辑:Nobeh提供的链接,以及此链接建议计算提交的任务数,然后多次调用take(),这是要走的路.我很惊讶没有办法向CompletionService或其Executor询问还剩下什么.

Mar*_*ark 7

请参阅http://www.javaspecialists.eu/archive/Issue214.html以获取有关如何扩展 ExecutorCompletionService 以执行您正在寻找的操作的体面建议。为方便起见,我已在下面粘贴了相关代码。作者还建议让服务实现 Iterable,我认为这是一个好主意。

FWIW,我同意你的看法,这确实应该成为标准实施的一部分,但可惜,事实并非如此。

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class CountingCompletionService<V> extends ExecutorCompletionService<V> {
  private final AtomicLong submittedTasks = new AtomicLong();
  private final AtomicLong completedTasks = new AtomicLong();

  public CountingCompletionService(Executor executor) {
    super(executor);
  }

  public CountingCompletionService(
      Executor executor, BlockingQueue<Future<V>> queue) {
    super(executor, queue);
  }

  public Future<V> submit(Callable<V> task) {
    Future<V> future = super.submit(task);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> submit(Runnable task, V result) {
    Future<V> future = super.submit(task, result);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> take() throws InterruptedException {
    Future<V> future = super.take();
    completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll() {
    Future<V> future = super.poll();
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll(long timeout, TimeUnit unit)
      throws InterruptedException {
    Future<V> future = super.poll(timeout, unit);
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public long getNumberOfCompletedTasks() {
    return completedTasks.get();
  }

  public long getNumberOfSubmittedTasks() {
    return submittedTasks.get();
  }

  public boolean hasUncompletedTasks() {
    return completedTasks.get() < submittedTasks.get();
  }
}
Run Code Online (Sandbox Code Playgroud)


nob*_*beh 3

回答这些问题你就能得到答案吗?

  • 您的异步任务是否会创建提交给的其他任务CompletionService
  • service唯一应该处理应用程序中创建的任务的对象吗?

基于参考文档CompletionService根据消费者/生产者方法采取行动并利用内部Executor. 所以,只要你在一个地方生产任务,在另一个地方消费任务,CompletionService.take()就表示是否还有结果可以给出。

我相信这个问题也对你有帮助。

  • 需要强调的是,您需要跟踪提交给 CompletionService 的作业数量。`CompletionService.take()` 的 javadoc 暗示它将阻塞,直到收到另一个 `Future`。这意味着除非您使用提交的线程数来终止循环,否则您将无限期地阻塞。 (3认同)