tso*_*akp 3 java java-8 java-threads completable-future
当已经运行的任务之一抛出异常时,我需要取消所有已调度但尚未运行的CompletableFuture任务.
尝试以下示例,但大多数情况下main方法不会退出(可能是由于某种类型的死锁).
public static void main(String[] args) {
ExecutorService executionService = Executors.newFixedThreadPool(5);
Set< CompletableFuture<?> > tasks = new HashSet<>();
for (int i = 0; i < 1000; i++) {
final int id = i;
CompletableFuture<?> c = CompletableFuture
.runAsync( () -> {
System.out.println("Running: " + id);
if ( id == 400 ) throw new RuntimeException("Exception from: " + id);
}, executionService )
.whenComplete( (v, ex) -> {
if ( ex != null ) {
System.out.println("Shutting down.");
executionService.shutdownNow();
System.out.println("shutdown.");
}
} );
tasks.add(c);
}
try{
CompletableFuture.allOf( tasks.stream().toArray(CompletableFuture[]::new) ).join();
}catch(Exception e) {
System.out.println("Got async exception: " + e);
}finally {
System.out.println("DONE");
}
}
Run Code Online (Sandbox Code Playgroud)
上次打印输出是这样的:
Running: 402
Running: 400
Running: 408
Running: 407
Running: 406
Running: 405
Running: 411
Shutting down.
Running: 410
Running: 409
Running: 413
Running: 412
shutdown.
Run Code Online (Sandbox Code Playgroud)
尝试shutdownNow在单独的线程上运行方法,但它仍然在大多数情况下给出相同的死锁.
知道什么可能导致这种僵局吗?
你认为什么是在CompletableFuture抛出异常时取消所有已调度但尚未运行的s 的最佳方法?
正在考虑迭代tasks并呼吁cancel每一个CompletableFuture.但我不喜欢这个是CancellationException从那里抛出的join.
你应该记住这一点
CompletableFuture<?> f = CompletableFuture.runAsync(runnable, executionService);
Run Code Online (Sandbox Code Playgroud)
基本上相当于
CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
if(!f.isDone()) {
try {
runnable.run();
f.complete(null);
}
catch(Throwable t) {
f.completeExceptionally(t);
}
}
});
Run Code Online (Sandbox Code Playgroud)
所以ExecutorService对此一无所知CompletableFuture,因此,一般不能取消它.它所拥有的只是一些工作,表达为实施Runnable.
换句话说,shutdownNow()将阻止执行待处理的作业,因此,剩余的期货将无法正常完成,但不会取消它们.然后,你呼吁join()所返回的未来,allOf由于未完成的未来将永远不会返回.
但请注意,预定的工作确实在做任何昂贵的事情之前检查未来是否已经完成.
因此,如果您将代码更改为
ExecutorService executionService = Executors.newFixedThreadPool(5);
Set<CompletableFuture<?>> tasks = ConcurrentHashMap.newKeySet();
AtomicBoolean canceled = new AtomicBoolean();
for(int i = 0; i < 1000; i++) {
final int id = i;
CompletableFuture<?> c = CompletableFuture
.runAsync(() -> {
System.out.println("Running: " + id);
if(id == 400) throw new RuntimeException("Exception from: " + id);
}, executionService);
c.whenComplete((v, ex) -> {
if(ex != null && canceled.compareAndSet(false, true)) {
System.out.println("Canceling.");
for(CompletableFuture<?> f: tasks) f.cancel(false);
System.out.println("Canceled.");
}
});
tasks.add(c);
if(canceled.get()) {
c.cancel(false);
break;
}
}
try {
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
} catch(Exception e) {
System.out.println("Got async exception: " + e);
} finally {
System.out.println("DONE");
}
executionService.shutdown();
Run Code Online (Sandbox Code Playgroud)
一旦相关的未来被取消,runnable将不会被执行.由于取消和普通执行之间存在竞争,因此将操作更改为可能会有所帮助
.runAsync(() -> {
System.out.println("Running: " + id);
if(id == 400) throw new RuntimeException("Exception from: " + id);
LockSupport.parkNanos(1000);
}, executionService);
Run Code Online (Sandbox Code Playgroud)
模拟一些实际工作量.然后,您将看到在遇到异常后执行的操作较少.
由于异步异常甚至可能在提交循环仍在运行时发生,因此它使用a AtomicBoolean来检测这种情况并在这种情况下停止循环.
请注意,对于a CompletableFuture,取消和任何其他特殊完成之间没有区别.通话f.cancel(…)相当于f.completeExceptionally(new CancellationException()).因此,由于CompletableFuture.allOf在特殊情况下报告任何异常,因此很可能CancellationException不是触发异常.
如果你用两个cancel(false)调用替换complete(null),你会得到类似的效果,runnables将不会为已经完成的期货执行,但allOf会报告原始异常,因为它是唯一的例外.它还有另一个积极的作用:用一个null值完成比构建一个CancellationException(对于每个未来的未来)便宜得多,因此强制完成通过complete(null)运行得更快,防止更多的未来执行.
| 归档时间: |
|
| 查看次数: |
213 次 |
| 最近记录: |