假设我有一些异步计算,例如:
CompletableFuture
.supplyAsync(() -> createFoo())
.thenAccept(foo -> doStuffWithFoo(foo));
Run Code Online (Sandbox Code Playgroud)
如果异步供应商根据某些指定的超时超时,是否有一种很好的方法为foo提供默认值?理想情况下,此类功能也会尝试取消运行缓慢的供应商.例如,是否存在类似于以下假设代码的标准库功能:
CompletableFuture
.supplyAsync(() -> createFoo())
.acceptEither(
CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
foo -> doStuffWithFoo(foo));
Run Code Online (Sandbox Code Playgroud)
或者甚至更好:
CompletableFuture
.supplyAsync(() -> createFoo())
.withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
.thenAccept(foo -> doStuffWithFoo(foo));
Run Code Online (Sandbox Code Playgroud)
我知道get(timeout, unit),但我想知道是否有一种更好的标准方式,以异步和反应方式应用超时,如上面的代码所示.
编辑:这是一个受Java 8启发的解决方案:在lambda表达式中强制检查异常处理.为什么强制,不是可选的?,但不幸的是它阻止了一个线程.如果我们依赖createFoo()来异步检查超时并抛出自己的超时异常,它可以在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建异常的成本(可以没有"快速投掷"而昂贵
static <T> Supplier<T> wrapped(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (RuntimeException e1) {
throw e1;
} catch (Throwable e2) {
throw new RuntimeException(e2);
}
};
}
CompletableFuture
.supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
.exceptionally(e -> "default")
.thenAcceptAsync(s -> doStuffWithFoo(foo));
Run Code Online (Sandbox Code Playgroud)
Rub*_*ben 16
CompletableFuture.supplyAsync只是一个帮助方法,可以为您创建CompletableFuture,并将任务提交给ForkJoin池.
您可以按照以下要求创建自己的supplyAsync:
private static final ScheduledExecutorService schedulerExecutor =
Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService =
Executors.newCachedThreadPool();
public static <T> CompletableFuture<T> supplyAsync(
final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
T defaultValue) {
final CompletableFuture<T> cf = new CompletableFuture<T>();
// as pointed out by Peti, the ForkJoinPool.commonPool() delivers a
// ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
// Using Executors.newCachedThreadPool instead in the example
// submit task
Future<?> future = executorService.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});
//schedule watcher
schedulerExecutor.schedule(() -> {
if (!cf.isDone()) {
cf.complete(defaultValue);
future.cancel(true);
}
}, timeoutValue, timeUnit);
return cf;
}
Run Code Online (Sandbox Code Playgroud)
使用该帮助器创建CompletableFuture就像在CompletableFuture中使用静态方法一样简单:
CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
TimeUnit.SECONDS, "default");
Run Code Online (Sandbox Code Playgroud)
测试它:
a = supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
// ignore
}
return "hi";
}, 1, TimeUnit.SECONDS, "default");
Run Code Online (Sandbox Code Playgroud)
在Java 9中,将有completeOnTimeout(T值,长超时,TimeUnit单元),它可以满足您的需求,但它不会取消缓慢的供应商.
还有一个orTimeout(长超时,TimeUnit单元),在超时的情况下异常完成.
DZone有一篇很好的文章如何解决这个问题:https://dzone.com/articles/asynchronous-timeouts
我不确定代码的版权,因此我不能在这里复制它.解决方案非常类似于Dane White的解决方案,但它使用带有单个线程的线程池加上schedule()以避免浪费线程以等待超时.
它还会抛出一个TimeoutException而不是返回默认值.
我认为在提供默认值时您总是需要额外的线程监视。我可能会采用两次 SupplyAsync 调用的方式,默认调用包含在实用程序 API 中,并通过 AcceptEither 链接。如果您想包装您的供应商,那么您可以使用实用程序 API 来为您进行“任一”调用:
public class TimeoutDefault {
public static <T> CompletableFuture<T> with(T t, int ms) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(ms);
} catch (InterruptedException e) { }
return t;
});
}
public static <T> Supplier<T> with(Supplier<T> supplier, T t, int ms) {
return () -> CompletableFuture.supplyAsync(supplier)
.applyToEither(TimeoutDefault.with(t, ms), i -> i).join();
}
}
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(Example::createFoo)
.acceptEither(
TimeoutDefault.with("default", 1000),
Example::doStuffWithFoo);
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(TimeoutDefault.with(Example::createFoo, "default", 1000))
.thenAccept(Example::doStuffWithFoo);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
20184 次 |
| 最近记录: |