vit*_*ums 2 java completable-future
我在“Java 8 in Action”中找不到任何关于为什么CompletableFuture故意忽略mayInterruptIfRunning. 但即便如此,我也没有真正看到 custom 的任何钩子cancel(boolean),这在中断不影响阻塞操作(例如 I/O 流、简单锁等)的情况下会派上用场。到目前为止,任务本身似乎是预期的钩子,在抽象级别上工作在Future这里没有任何好处。
因此,我要问的是为了从这种情况中挤出一些 neet 自定义取消机制而必须引入的最少样板代码集。
ACompletableFuture是封装以下三种状态之一的对象:
从“未完成”到其他状态之一的转换可以由传递给其工厂方法之一或CompletionStage实现方法之一的函数触发。
但是也可以调用complete或completeExceptionally在它上面。在这方面,呼吁cancel它与呼吁它具有相同的效果completeExceptionally(new CancellationException())。
这也是它的类名所暗示的,它是一个可以完成而不是[最终]完成的未来。该类不提供对可能在任意线程中运行的现有完成尝试的控制,并且它不会特别对待由其自身安排的完成尝试。
同样重要的是要理解,当通过CompletionStage实现方法链接操作时,产生的未来只代表链的最后一个阶段,取消它也只能影响最后一个阶段。
例如下面的代码
CompletableFuture<?> cf = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
String s = "value1";
System.out.println("return initial " + s);
return s;
}).thenApply(s -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
s = s.toUpperCase();
System.out.println("return transformed " + s);
return s;
}).thenAccept(s -> {
System.out.println("starting last stage");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("processed " + s);
});
cf.cancel(false);
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
Run Code Online (Sandbox Code Playgroud)
将打印
return initial value1
return transformed VALUE1
Run Code Online (Sandbox Code Playgroud)
证明只有链的最后一个阶段被取消,而前一个阶段运行完成,完全不受影响。
保留对第一阶段的引用以尝试取消整个链只会在第一阶段尚未完成时起作用,因为尝试取消已经完成的未来是无效的。
long[] waitBeforeCancel = { 500, 1500 };
for(long l: waitBeforeCancel) {
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
String s = "value1";
System.out.println("return initial " + s);
return s;
});
first.thenApply(s -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
s = s.toUpperCase();
System.out.println("return transformed " + s);
return s;
}).thenAccept(s -> {
System.out.println("starting last stage");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("processed " + s);
});
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
System.out.println("Trying to cancel");
first.cancel(false);
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
System.out.println();
}
Run Code Online (Sandbox Code Playgroud)
Trying to cancel
return initial value1
return initial value1
Trying to cancel
return transformed VALUE1
starting last stage
processed VALUE1
Run Code Online (Sandbox Code Playgroud)
这说明当第一阶段及时取消时整个链被取消(除了第一阶段Supplier的代码由于不存在中断而仍然完成),而取消太晚不会影响任何阶段。
记住所有CompletableFuture实例,以便能够取消所有实例,将违背 API 的目的。您可以使用跟踪所有当前处理的作业的执行程序,在最后一个阶段被取消时将取消和中断转发给它们。然后,CompletableFuture实现会将取消转发到相关阶段。这样,完成的阶段仍然可以被垃圾收集。
设置有点复杂;预先需要包装执行器来构造CompletableFuture链,取消的转发需要已构造链的最后阶段。这就是为什么我制作了一个实用方法,接受链构造代码作为Function<Executor,CompletableFuture<T>>:
static <T> Future<T> setupForInterruption(Function<Executor,CompletableFuture<T>> f) {
return setupForInterruption(f, ForkJoinPool.commonPool());
}
static <T> Future<T> setupForInterruption(
Function<Executor,CompletableFuture<T>> f, Executor e) {
AtomicBoolean dontAcceptMore = new AtomicBoolean();
Set<Future<?>> running = ConcurrentHashMap.newKeySet();
Executor wrapper = r -> {
if(dontAcceptMore.get()) throw new CancellationException();
FutureTask<?> ft = new FutureTask<>(r, null) {
@Override protected void done() { running.remove(this); }
};
running.add(ft);
e.execute(ft);
};
CompletableFuture<T> cf = f.apply(wrapper);
cf.whenComplete((v,t) -> {
if(cf.isCancelled()) {
dontAcceptMore.set(true);
running.removeIf(ft -> ft.cancel(true) || ft.isDone());
}
});
return cf;
}
Run Code Online (Sandbox Code Playgroud)
这可以像
long[] waitBeforeCancel = { 500, 1500, 2500, 3500 };
for(long l: waitBeforeCancel) {
Future<?> f = setupForInterruption(executor ->
CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
if(Thread.interrupted()) throw new IllegalStateException();
String s = "value1";
System.out.println("return initial " + s);
return s;
}, executor).thenApplyAsync(s -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
if(Thread.interrupted()) throw new IllegalStateException();
s = s.toUpperCase();
System.out.println("return transformed " + s);
return s;
}, executor).thenAcceptAsync(s -> {
System.out.println("starting last stage");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
if(Thread.interrupted()) throw new IllegalStateException();
System.out.println("processed " + s);
}, executor));
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
System.out.println("Trying to cancel");
f.cancel(true);
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
System.out.println();
}
Run Code Online (Sandbox Code Playgroud)
Trying to cancel
return initial value1
Trying to cancel
return initial value1
return transformed VALUE1
starting last stage
Trying to cancel
return initial value1
return transformed VALUE1
starting last stage
processed VALUE1
Trying to cancel
Run Code Online (Sandbox Code Playgroud)
由于 API 使用Supplier、Function和Consumer,它们都不允许 throw InterruptedException,因此此示例代码对中断进行了显式测试并IllegalStateException改为throws 。这也是它使用parkNanos它在中断时立即返回而不是Thread.sleep首先返回的原因。在实际应用场景中,您可能会调用中断敏感方法,并且必须捕获InterruptedException、InterruptedIOException、 或InterruptedNamingException(等)并将它们转换为未经检查的异常。
请注意,上述方法总是会被中断取消,因为CompletableFuture不能说明取消是否有中断。如果要获取该参数的值,则需要一个前端Future实现,该实现反映上一阶段的结果,将取消转发给它,但将 的值传递mayInterruptIfRunning给当前正在运行的作业。
class FrontEnd<R> implements Future<R> {
final CompletableFuture<R> lastStage;
final Set<Future<?>> running;
FrontEnd(CompletableFuture<R> lastStage, Set<Future<?>> running) {
this.lastStage = lastStage;
this.running = running;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean didCancel = lastStage.cancel(false);
if(didCancel)
running.removeIf(f -> f.cancel(mayInterruptIfRunning) || f.isDone());
return didCancel;
}
@Override
public boolean isCancelled() {
return lastStage.isCancelled();
}
@Override
public boolean isDone() {
return lastStage.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return lastStage.get();
}
@Override
public R get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return lastStage.get(timeout, unit);
}
static <T> Future<T> setup(Function<Executor,CompletableFuture<T>> f) {
return setup(f, ForkJoinPool.commonPool());
}
static <T> Future<T> setup(Function<Executor,CompletableFuture<T>> f, Executor e) {
AtomicBoolean dontAcceptMore = new AtomicBoolean();
Set<Future<?>> running = ConcurrentHashMap.newKeySet();
Executor wrapper = r -> {
if(dontAcceptMore.get()) throw new CancellationException();
FutureTask<?> ft = new FutureTask<>(r, null) {
@Override protected void done() { running.remove(this); }
};
running.add(ft);
e.execute(ft);
};
CompletableFuture<T> cf = f.apply(wrapper);
cf.whenComplete((v,t) -> { if(cf.isCancelled()) dontAcceptMore.set(true); });
return new FrontEnd<>(cf, running);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
123 次 |
| 最近记录: |