从CompletableFuture调用ExecutorService.shutdownNow

tso*_*akp 3 java java-8 java-threads completable-future

当已经运行的任务之一抛出异常时,我需要取消所有已调度但尚未运行的CompletableFuture任务.

尝试以下示例,但大多数情况下main方法不会退出(可能是由于某种类型的死锁).

public static void main(String[] args) {
    ExecutorService executionService = Executors.newFixedThreadPool(5);

    Set< CompletableFuture<?> > tasks = new HashSet<>();

    for (int i = 0; i < 1000; i++) {
        final int id = i;
        CompletableFuture<?> c = CompletableFuture

        .runAsync( () -> {
            System.out.println("Running: " + id); 
            if ( id == 400 ) throw new RuntimeException("Exception from: " + id);
        }, executionService )

        .whenComplete( (v, ex) -> { 
            if ( ex != null ) {
                System.out.println("Shutting down.");
                executionService.shutdownNow();
                System.out.println("shutdown.");
            }
        } );

        tasks.add(c);
    }

    try{ 
        CompletableFuture.allOf( tasks.stream().toArray(CompletableFuture[]::new) ).join(); 
    }catch(Exception e) { 
        System.out.println("Got async exception: " + e); 
    }finally { 
        System.out.println("DONE"); 
    }        
}
Run Code Online (Sandbox Code Playgroud)

上次打印输出是这样的:

Running: 402
Running: 400
Running: 408
Running: 407
Running: 406
Running: 405
Running: 411
Shutting down.
Running: 410
Running: 409
Running: 413
Running: 412
shutdown.
Run Code Online (Sandbox Code Playgroud)

尝试shutdownNow在单独的线程上运行方法,但它仍然在大多数情况下给出相同的死锁.

知道什么可能导致这种僵局吗?

你认为什么是在CompletableFuture抛出异常时取消所有已调度但尚未运行的s 的最佳方法?

正在考虑迭代tasks并呼吁cancel每一个CompletableFuture.但我不喜欢这个是CancellationException从那里抛出的join.

Hol*_*ger 5

你应该记住这一点

CompletableFuture<?> f = CompletableFuture.runAsync(runnable, executionService);
Run Code Online (Sandbox Code Playgroud)

基本上相当于

CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
    if(!f.isDone()) {
        try {
            runnable.run();
            f.complete(null);
        }
        catch(Throwable t) {
            f.completeExceptionally(t);
        }
    }
});
Run Code Online (Sandbox Code Playgroud)

所以ExecutorService对此一无所知CompletableFuture,因此,一般不能取消它.它所拥有的只是一些工作,表达为实施Runnable.

换句话说,shutdownNow()将阻止执行待处理的作业,因此,剩余的期货将无法正常完成,但不会取消它们.然后,你呼吁join()所返回的未来,allOf由于未完成的未来将永远不会返回.

但请注意,预定的工作确实在做任何昂贵的事情之前检查未来是否已经完成.

因此,如果您将代码更改为

ExecutorService executionService = Executors.newFixedThreadPool(5);
Set<CompletableFuture<?>> tasks = ConcurrentHashMap.newKeySet();
AtomicBoolean canceled = new AtomicBoolean();

for(int i = 0; i < 1000; i++) {
    final int id = i;
    CompletableFuture<?> c = CompletableFuture
        .runAsync(() -> {
            System.out.println("Running: " + id); 
            if(id == 400) throw new RuntimeException("Exception from: " + id);
        }, executionService);
        c.whenComplete((v, ex) -> {
            if(ex != null && canceled.compareAndSet(false, true)) {
                System.out.println("Canceling.");
                for(CompletableFuture<?> f: tasks) f.cancel(false);
                System.out.println("Canceled.");
            }
        });
    tasks.add(c);
    if(canceled.get()) {
        c.cancel(false);
        break;
    }
}

try {
    CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
} catch(Exception e) {
    System.out.println("Got async exception: " + e);
} finally {
    System.out.println("DONE");
}
executionService.shutdown();
Run Code Online (Sandbox Code Playgroud)

一旦相关的未来被取消,runnable将不会被执行.由于取消和普通执行之间存在竞争,因此将操作更改为可能会有所帮助

.runAsync(() -> {
    System.out.println("Running: " + id); 
    if(id == 400) throw new RuntimeException("Exception from: " + id);
    LockSupport.parkNanos(1000);
}, executionService);
Run Code Online (Sandbox Code Playgroud)

模拟一些实际工作量.然后,您将看到在遇到异常后执行的操作较少.

由于异步异常甚至可能在提交循环仍在运行时发生,因此它使用a AtomicBoolean来检测这种情况并在这种情况下停止循环.


请注意,对于a CompletableFuture,取消和任何其他特殊完成之间没有区别.通话f.cancel(…)相当于f.completeExceptionally(new CancellationException()).因此,由于CompletableFuture.allOf在特殊情况下报告任何异常,因此很可能CancellationException不是触发异常.

如果你用两个cancel(false)调用替换complete(null),你会得到类似的效果,runnables将不会为已经完成的期货执行,但allOf会报告原始异常,因为它是唯一的例外.它还有另一个积极的作用:用一个null值完成比构建一个CancellationException(对于每个未来的未来)便宜得多,因此强制完成通过complete(null)运行得更快,防止更多的未来执行.

  • 是的,在 `runAsync` 的情况下,作业将被立即安排,因此取消 `CompletableFuture` 不再阻止作业的执行,因此只有 `isDone` 预检查会阻止你的 `Runnable` 的执行。这与例如 `future2 = future1.thenApplyAsync(x -&gt; x, executor)` 不同,其中作业仅在 `future1` 正常完成后调度,因此如果 `future2` 在 `future1` 完成之前被取消,这项工作甚至不会被安排。一旦被调度,作业的行为是相似的。是的,您现在可以重用 executor 服务。 (2认同)