如果其中一个抛出异常,如何停止所有正在运行的线程?

Ach*_*u S 11 java multithreading

在我的一个应用程序中,我正在使用ExecutorService该类创建一个固定的线程池并CountDownLatch等待线程完成.如果进程没有抛出任何异常,这工作正常.如果任何线程中发生异常,我需要停止所有正在运行的线程并将错误报告给主线程.任何人都可以帮我解决这个问题吗?

这是我用于执行多个线程的示例代码.

    private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
        try      
       {
        CountDownLatch latch = new CountDownLatch(noOfThreads);
        for(int i=0; i< noOfThreads; i++){
         executor.submit(new ThreadExecutor(latch));
        }
        latch.await();           
       }
       catch(Exception e)
       {
        e.printStackTrace();
       }
       finally
       {
        executor.shutDown();
       }
   }
Run Code Online (Sandbox Code Playgroud)

这是Executor类

     public class ThreadExecutor implements Callable<String> {
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {
        doMyTask(); // process logic goes here!
        this.latch.countDown();
        return "Success";
    }
Run Code Online (Sandbox Code Playgroud)

================================================== ===========================

谢谢你们 :)

我已经纠正了我的课程,如下所示,现在正在运作.

private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
       ArrayList<Future<Object>> futureList = new ArrayList<Future<Object>>(noOfThreads );
    try
    {
        userContext = BSF.getMyContext();
        CountDownLatch latch = new CountDownLatch(noOfComponentsToImport);

        for(ImportContent artifact:artifactList){
            futureList.add(executor.submit(new ThreadExecutor(latch)));
        }

        latch.await();

        for(Future<Object> future : futureList)
        {
                  try
                  {
                      future.get();                 
                   }
                   catch(ExecutionException e)
                   {   //handle it               
                    }
        }           

    }
    catch (Exception e) {
       //handle it
    }
    finally
    {
        executor.shutdown();      

        try
        {
            executor.awaitTermination(90000, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e)
        {
           //handle it
        }
    }
   }
Run Code Online (Sandbox Code Playgroud)

执行者类:

public class ThreadExecutor implements Callable<String> {
        private static volatile boolean isAnyError;
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {

      try{
            if(!isAnyError)
            { 
               doMyTask(); // process logic goes here!
            }
     }
     catch(Exception e)
     {
        isAnyError = true ;
        throw e;
      }
      finally
      {
        this.latch.countDown();
       }
        return "Success";
    }
Run Code Online (Sandbox Code Playgroud)

art*_*tol 5

使用a ExecutorCompletionService,完成ExecutorService任务的时间超过任务的持续时间(即之后不会关闭):

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Threader {

    static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        new Threader().start();
        service.shutdown();
    }

    private void start() {
        CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
                service);
        /*
         * Holds all the futures for the submitted tasks
         */
        List<Future<Void>> results = new ArrayList<Future<Void>>();

        for (int i = 0; i < 3; i++) {
            final int callableNumber = i;

            results.add(completionService.submit(new Callable<Void>() {

                                                     @Override
                                                     public Void call() throws Exception {
                                                         System.out.println("Task " + callableNumber
                                                                 + " in progress");
                                                         try {
                                                             Thread.sleep(callableNumber * 1000);
                                                         } catch (InterruptedException ex) {
                                                             System.out.println("Task " + callableNumber
                                                                     + " cancelled");
                                                             return null;
                                                         }
                                                         if (callableNumber == 1) {
                                                             throw new Exception("Wrong answer for task "
                                                                     + callableNumber);
                                                         }
                                                         System.out.println("Task " + callableNumber + " complete");
                                                         return null;
                                                     }
                                                 }

            ));
        }

        boolean complete = false;
        while (!complete) {
            complete = true;
            Iterator<Future<Void>> futuresIt = results.iterator();
            while (futuresIt.hasNext()) {
                if (futuresIt.next().isDone()) {
                    futuresIt.remove();
                } else {
                    complete = false;
                }
            }

            if (!results.isEmpty()) {
                try {
                /*
                 * Examine results of next completed task
                 */
                    completionService.take().get();
                } catch (InterruptedException e) {
                /*
                 * Give up - interrupted.
                 */
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
                /*
                 * The task threw an exception
                 */
                    System.out.println("Execution exception " + e.getMessage());
                    complete = true;
                    for (Future<Void> future : results) {
                        if (!future.isDone()) {
                            System.out.println("Cancelling " + future);
                            future.cancel(true);
                        }
                    }
                }
            }
        }

    }
}
Run Code Online (Sandbox Code Playgroud)

输出类似于:

Task 0 in progress
Task 2 in progress
Task 1 in progress
Task 0 complete
Execution exception java.lang.Exception: Wrong answer for task 1
Cancelling java.util.concurrent.FutureTask@a59698
Task 2 cancelled
Run Code Online (Sandbox Code Playgroud)

由于任务1失败,任务2被取消.


Mar*_*nik 4

我强烈建议您使用强大的机制来倒计时闩锁。使用包罗万象的try-finally { latch.countDown(); }方法 使用单独的机制检测线程中的错误。