我应该何时在ExecutorService上使用CompletionService?

rip*_*234 74 java concurrency multithreading completion-service

我刚刚在这篇博客文章中找到了CompletionService .但是,这并没有真正展示CompletionService相对于标准ExecutorService的优势.可以用任何一个编写相同的代码.那么,什么时候CompletionService有用呢?

你能给一个简短的代码样本,使它清晰吗?例如,此代码示例仅显示不需要CompletionService的位置(=等效于ExecutorService)

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
    //        CompletionService<Long> taskCompletionService =
    //                new ExecutorCompletionService<Long>(taskExecutor);
    Callable<Long> callable = new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            return 1L;
        }
    };

    Future<Long> future = // taskCompletionService.submit(callable);
        taskExecutor.submit(callable);

    while (!future.isDone()) {
        // Do some work...
        System.out.println("Working on something...");
    }
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
Run Code Online (Sandbox Code Playgroud)

Ale*_*ler 146

省略许多细节:

  • ExecutorService =传入队列+工作线程
  • CompletionService =传入队列+工作线程+输出队列


Bha*_*kar 96

使用ExecutorService,一旦提交了要运行的任务,就需要手动编写代码以有效地获取已完成任务的结果.

有了CompletionService,这几乎是自动化的.由于您只提交了一项任务,因此您提供的代码中的差异不是很明显.但是,假设您有一个要提交的任务列表.在下面的示例中,将多个任务提交给CompletionService.然后,它不是试图找出已完成的任务(以获得结果),而是要求CompletionService实例在结果可用时返回结果.

public class CompletionServiceTest {

        class CalcResult {
             long result ;

             CalcResult(long l) {
                 result = l;
             }
        }

        class CallableTask implements Callable<CalcResult> {
            String taskName ;
            long  input1 ;
            int input2 ;

            CallableTask(String name , long v1 , int v2 ) {
                taskName = name;
                input1 = v1;
                input2 = v2 ;
            }

            public CalcResult call() throws Exception {
                System.out.println(" Task " + taskName + " Started -----");
                for(int i=0;i<input2 ;i++) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        System.out.println(" Task " + taskName + " Interrupted !! ");
                        e.printStackTrace();
                    }
                    input1 += i;
                }
                System.out.println(" Task " + taskName + " Completed @@@@@@");
                return new CalcResult(input1) ;
            }

        }

        public void test(){
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);

            int submittedTasks = 5;
            for (int i=0;i< submittedTasks;i++) {
                taskCompletionService.submit(new CallableTask (
                        String.valueOf(i), 
                            (i * 10), 
                            ((i * 10) + 10  )
                        ));
               System.out.println("Task " + String.valueOf(i) + "subitted");
            }
            for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
                try {
                    System.out.println("trying to take from Completion service");
                    Future<CalcResult> result = taskCompletionService.take();
                    System.out.println("result for a task availble in queue.Trying to get()");
                    // above call blocks till atleast one task is completed and results availble for it
                    // but we dont have to worry which one

                    // process the result here by doing result.get()
                    CalcResult l = result.get();
                    System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));

                } catch (InterruptedException e) {
                    // Something went wrong with a task submitted
                    System.out.println("Error Interrupted exception");
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // Something went wrong with the result
                    e.printStackTrace();
                    System.out.println("Error get() threw exception");
                }
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

  • 另一个例子请参阅Java Concurrency in Practice pg.130.有一个CompletionService用于在图像可用时呈现图像. (5认同)
  • 有没有更好的方法来了解ExecutorCompletionService何时完成所有任务,而不是跟踪提交的任务数量? (2认同)

Tim*_*der 10

我认为javadoc最能回答的问题是什么时候这种CompletionService方式有用ExecutorService.

一种服务,它将新异步任务的生成与已完成任务的结果消耗分离.

基本上,这个接口允许程序让生产者创建和提交任务(甚至检查这些提交的结果),而不知道这些任务结果的任何其他消费者.同时,消费者在不知道提交任务的生产者的情况下了解CompletionService可能polltake结果.

为了记录,我可能是错的,因为它已经很晚了,但我相当确定该博客文章中的示例代码会导致内存泄漏.如果没有活跃的消费者从ExecutorCompletionService内部队列中取出结果,我不确定博主是如何预期排队的.


Jed*_*ith 10

基本上,CompletionService如果要并行执行多个任务,然后按完成顺序使用它们,则使用a .所以,如果我执行5个工作,那CompletionService将会给我第一个完成的工作.Executor除了提交一个任务之外,只有一个任务的例子不会带来额外的价值Callable.


Skl*_*vit 5

首先,如果我们不想浪费处理器时间,我们就不会使用

while (!future.isDone()) {
        // Do some work...
}
Run Code Online (Sandbox Code Playgroud)

我们必须使用

service.shutdown();
service.awaitTermination(14, TimeUnit.DAYS);
Run Code Online (Sandbox Code Playgroud)

这段代码的坏处是它会关闭ExecutorService。如果我们想继续使用它(即我们有一些递归任务创建),我们有两种选择:invokeAll 或ExecutorService.

invokeAll将等到所有任务完成。ExecutorService使我们能够一一获取或投票结果。

最后,递归示例:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

while (Tasks.size() > 0) {
    for (final Task task : Tasks) {
        completionService.submit(new Callable<String>() {   
            @Override
            public String call() throws Exception {
                return DoTask(task);
            }
        });
    } 

    try {                   
        int taskNum = Tasks.size();
        Tasks.clear();
        for (int i = 0; i < taskNum; ++i) {
            Result result = completionService.take().get();
            if (result != null)
                Tasks.add(result.toTask());
        }           
    } catch (InterruptedException e) {
    //  error :(
    } catch (ExecutionException e) {
    //  error :(
    }
}
Run Code Online (Sandbox Code Playgroud)