Java 8 CompletableFuture中的默认值超时

jon*_*rry 27 java java-8

假设我有一些异步计算,例如:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .thenAccept(foo -> doStuffWithFoo(foo));
Run Code Online (Sandbox Code Playgroud)

如果异步供应商根据某些指定的超时超时,是否有一种很好的方法为foo提供默认值?理想情况下,此类功能也会尝试取消运行缓慢的供应商.例如,是否存在类似于以下假设代码的标准库功能:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .acceptEither(
                CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
                foo -> doStuffWithFoo(foo));
Run Code Online (Sandbox Code Playgroud)

或者甚至更好:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
        .thenAccept(foo -> doStuffWithFoo(foo));
Run Code Online (Sandbox Code Playgroud)

我知道get(timeout, unit),但我想知道是否有一种更好的标准方式,以异步和反应方式应用超时,如上面的代码所示.

编辑:这是一个受Java 8启发的解决方案:在lambda表达式中强制检查异常处理.为什么强制,不是可选的?,但不幸的是它阻止了一个线程.如果我们依赖createFoo()来异步检查超时并抛出自己的超时异常,它可以在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建异常的成本(可以没有"快速投掷"而昂贵

static <T> Supplier<T> wrapped(Callable<T> callable) {
    return () -> {
        try {
            return callable.call();
        } catch (RuntimeException e1) {
            throw e1;
        } catch (Throwable e2) {
            throw new RuntimeException(e2);
        }
    };
}
CompletableFuture
        .supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
        .exceptionally(e -> "default")
        .thenAcceptAsync(s -> doStuffWithFoo(foo));
Run Code Online (Sandbox Code Playgroud)

Rub*_*ben 16

CompletableFuture.supplyAsync只是一个帮助方法,可以为您创建CompletableFuture,并将任务提交给ForkJoin池.

您可以按照以下要求创建自己的supplyAsync:

private static final ScheduledExecutorService schedulerExecutor = 
                                 Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService = 
                                 Executors.newCachedThreadPool();


public static <T> CompletableFuture<T> supplyAsync(
        final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
        T defaultValue) {

    final CompletableFuture<T> cf = new CompletableFuture<T>();

    // as pointed out by Peti, the ForkJoinPool.commonPool() delivers a 
    // ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
    // Using Executors.newCachedThreadPool instead in the example
    // submit task
    Future<?> future = executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    //schedule watcher
    schedulerExecutor.schedule(() -> {
        if (!cf.isDone()) {
            cf.complete(defaultValue);
            future.cancel(true);
        }

    }, timeoutValue, timeUnit);

    return cf;
}
Run Code Online (Sandbox Code Playgroud)

使用该帮助器创建CompletableFuture就像在CompletableFuture中使用静态方法一样简单:

    CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
            TimeUnit.SECONDS, "default");
Run Code Online (Sandbox Code Playgroud)

测试它:

    a = supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e1) {
            // ignore
        }
        return "hi";
    }, 1, TimeUnit.SECONDS, "default");
Run Code Online (Sandbox Code Playgroud)

  • 我想知道Thread.sleep(2000); 在您的示例中实际上被打断了。不是**。如果我将您的示例从ForkJoinPool.commonPool()。submit更改为Executors.newFixedThreadPool(1).submit,那么它就是。我想知道为什么... (2认同)
  • 你是对的@Peti!ForkJoinPool中的commonPool提供了一个ForkJoinTask类型的Future的实现,在取消的情况下不会中断Future,而Executors则提供了一个执行的Future。请参见ForkJoinTask`的cancel方法中的Javadocs:_mayInterruptIfRunning:此值在默认实现中无效,因为不使用中断来控制cancel_ (2认同)

use*_*547 9

在Java 9中,将有completeOnTimeout(T值,长超时,TimeUnit单元),它可以满足您的需求,但它不会取消缓慢的供应商.

还有一个orTimeout(长超时,TimeUnit单元),在超时的情况下异常完成.


Aar*_*lla 7

DZone有一篇很好的文章如何解决这个问题:https://dzone.com/articles/asynchronous-timeouts

我不确定代码的版权,因此我不能在这里复制它.解决方案非常类似于Dane White的解决方案,但它使用带有单个线程的线程池加上schedule()以避免浪费线程以等待超时.

它还会抛出一个TimeoutException而不是返回默认值.


Dan*_*ite 4

我认为在提供默认值时您总是需要额外的线程监视。我可能会采用两次 SupplyAsync 调用的方式,默认调用包含在实用程序 API 中,并通过 AcceptEither 链接。如果您想包装您的供应商,那么您可以使用实用程序 API 来为您进行“任一”调用:

public class TimeoutDefault {
    public static <T> CompletableFuture<T> with(T t, int ms) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) { }
            return t;
        });
    }

    public static <T> Supplier<T> with(Supplier<T> supplier, T t, int ms) {
        return () -> CompletableFuture.supplyAsync(supplier)
            .applyToEither(TimeoutDefault.with(t, ms), i -> i).join();
    }
}

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(Example::createFoo)
        .acceptEither(
            TimeoutDefault.with("default", 1000),
            Example::doStuffWithFoo);

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(TimeoutDefault.with(Example::createFoo, "default", 1000))
        .thenAccept(Example::doStuffWithFoo);
Run Code Online (Sandbox Code Playgroud)

  • 在线程上休眠并不是进行异步编程的好方法。您实际上白白浪费了线程空间。 (13认同)