在线程失败和异常处理时停止 ExecutorService

Mit*_*t94 6 java multithreading exception

这是我为了暴露我的问题而做的一个简化的例子。我有一些doSomeWork()使用 ExecutorService 以多线程方式处理的任务(最多一次 4 个线程)。但是,如果任何线程/任务生成异常,我想:

  1. 停止处理任何进一步的任务。

  2. 在主线程级别捕获异常。

    public static void main(String[] args) {
        final ExecutorService threadPool = Executors.newFixedThreadPool(4);
        final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(threadPool);
    
        try {
            for (int i = 0; i < 10; i++) {
                int b = i;
                    completionService.submit(() -> doSomeWork(b));
            }
    
            threadPool.shutdown();
            threadPool.awaitTermination(8, TimeUnit.HOURS);
    
            System.exit(0);
    
        } catch (Exception e) {
            System.out.println("Something wrong happened: " + e.getMessage());
        }
    
        System.exit(1);
    
    }
    
    //This function have 50% odds of throwing an exception
    public static Void doSomeWork(int i) throws Exception {
    
        Thread.sleep(500);
        if ((Math.random() > 0.5))
        {
            System.out.println("I have reached indice: " + i);
        }
        else
        {
            throw new Exception("I couldn't handle indice " + i);
        }
        return null;
    }
    
    Run Code Online (Sandbox Code Playgroud)

目前,执行会输出如下内容:

I have reached indice: 0
I have reached indice: 2
I have reached indice: 1
I have reached indice: 4
I have reached indice: 6
I have reached indice: 7
I have reached indice: 5
I have reached indice: 9
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,indice 3丢失了,但剩余线程的执行已完成。它也没有输出任何有关异常的信息。

我想要的输出是这样的:

I have reached indice: 0
I have reached indice: 2
I have reached indice: 1
Something wrong happened: I couldn't handle indice 3
Run Code Online (Sandbox Code Playgroud)

我发现的围绕这个问题的其他解决方案是使用带有 future 的可调用函数,但以阻塞方式。我不能在等待未来时阻止其他线程的执行,否则整个多线程就没有意义。

Mat*_*NNZ 5

您可以使用CompletableFuture. 这是我测试的你的主要功能:

final ExecutorService executorService = Executors.newFixedThreadPool(4);
final List<CompletableFuture<Void>> all = new ArrayList<>();

try {
    for (int i = 0; i < 10; i++) {
        int b = i;
        CompletableFuture<Void> v = CompletableFuture.runAsync(() -> {
                    try {
                        doSomeWork(b);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                },
                executorService);
        all.add(v);
    }

    CompletableFuture<Void> placeholder = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
    failFast(all, placeholder);

    System.out.println("All tasks ended");

} catch (Exception e) {
    System.out.println("Something wrong happened: " + e.getMessage());
} finally {
    executorService.shutdownNow();
}
Run Code Online (Sandbox Code Playgroud)

一旦其中一个失败(或全部完成),使共同未来失败的效用函数:

private static <T> void failFast(List<CompletableFuture<T>> futures, CompletableFuture<T> joint) {
    while (true) {
        if (joint.isDone()) {
            return;
        }
        for (CompletableFuture<T> future : futures) {
            if (future.isCompletedExceptionally()) {
                return;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我得到的输出:

I have reached indice: 1
I have reached indice: 7
I have reached indice: 5
I have reached indice: 4
Something wrong happened: java.lang.RuntimeException: java.lang.Exception: I couldn't handle indice 0
Run Code Online (Sandbox Code Playgroud)

解释:

该方法CompletableFuture.runAsync()允许您提供一个Runnable(您的doSomeWork)和一个具有一定数量线程的执行器。在这里,我传递了一个具有 4 个线程的执行器(正如您在示例中所做的那样)。

在可运行的内部,我不仅运行该doSomeWork函数,而且还捕获Exception并抛出一个RuntimeException(需要这样做,因为 Lambda 不支持检查异常,因此我需要将其包装到运行时异常中,但它仍然会中断执行并被被你的主要捕获)。

每次我CompletableFuture<Void>为具有给定索引的任务创建一个新任务时i,我都会将此结果存储到可完成的 future 列表中。

for 循环不需要执行任何内容,因为可完成的 future 是异步运行的。

因此,我创建了一个联合可完成的未来CompletableFuture.allOf(...),然后我failFast在这个未来上使用效用函数,以便在其中一个任务失败时立即停止(或继续直到所有任务完成)。

因此,基本上,一旦其中一个 future 未能抛出异常,联合 future 就被认为已完成,因此会将句柄留给主线程,同时主线程会被RuntimeExceptionlambda 表达式内抛出的异常击中。

注意:感谢 Thomas 的评论,我更新了代码以使用 anExecutorService而不是简单的Executor. 这允许您在捕获异常后调用块shutdownNow()内部。finally正如 Thomas 所建议的,您也可以直接在函数RuntimeException内部抛出 a doSomeWork,这样您就不需要捕获并包装在 lambda 表达式内。

其他说明: @matt 让我注意到一些我不知道的事情。当所有未来都完成时,无论成功与否,未来.allOf()都将完成。因此,正如他指出的那样,我的解决方案不会按原样起作用。我再次编辑了答案以考虑他的评论,感谢@matt 让我注意到。