将Java Future转换为CompletableFuture

Dan*_*ood 79 java future java-8

Java 8引入CompletableFuture了一个可组合的Future的新实现(包括一堆thenXxx方法).我想独占使用它,但我想使用的许多库只返回不可组合的Future实例.

有没有办法将返回的Future实例包装在一个内部,CompleteableFuture以便我可以编写它?

nos*_*sid 51

有一种方法,但你不会喜欢它.以下方法将a Future<T>转换为CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}
Run Code Online (Sandbox Code Playgroud)

显然,这种方法的问题在于,对于每个Future,线程将被阻塞以等待Future的结果 - 与未来的想法相对立.在某些情况下,可能会做得更好.但是,一般来说,没有积极等待未来结果就没有解决方案.

  • 嗯...这个解决方案不是吃"公共池"中的一个线程,只是为了等待?那些"公共池"线程永远不应该阻止......嗯...... (11认同)
  • 这可能并不完美,但是使用CompletableFuture.supplyAsync(supplier,new SinglethreadExecutor())至少不会阻塞公共池线程。 (3认同)
  • 求求你,永远不要那样做 (3认同)
  • @MikeFHay 我认为使用“FJP.ManagedBlocker”将比“CompletableFuture.supplyAsync(supplier, t -&gt; newSingleThreadExecutor())”有更好的利用率。我已经更新了答案中的代码以反映这一点。 (2认同)

Kaf*_*que 48

如果您要使用的库除了Future样式之外还提供回调样式方法,您可以为它提供一个处理程序来完成CompletableFuture而不会有任何额外的线程阻塞.像这样:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)
Run Code Online (Sandbox Code Playgroud)

如果没有回调,我看到解决这个问题的另一种方法是使用一个轮询循环,将所有Future.isDone()检查放在一个线程上,然后在Future可获取时调用complete.


Mat*_*ieu 33

如果您Future是调用ExecutorService方法(例如submit())的结果,最简单的CompletableFuture.runAsync(Runnable, Executor)方法是改用该方法。

Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);
Run Code Online (Sandbox Code Playgroud)

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);
Run Code Online (Sandbox Code Playgroud)

CompletableFuture则创建“原生地”。

编辑:通过@MartinAndersson 更正@SamMefford 的评论,如果您想传递 a Callable,则需要调用supplyAsync(),将 the 转换Callable<T>为 a Supplier<T>,例如:

CompletableFuture.supplyAsync(() -> {
    try { return myCallable.call(); }
    catch (Exception ex) { throw new CompletionException(ex); } // Or return default value
}, myExecutor);
Run Code Online (Sandbox Code Playgroud)

因为T Callable.call() throws Exception;抛出异常而T Supplier.get();不会,所以您必须捕获异常以便原型兼容。

关于异常处理的说明

get()方法未指定 a throws,这意味着它不应抛出已检查的异常。但是,可以使用未经检查的异常。中的代码CompletableFuture显示CompletionException已使用且未检查(即是 a RuntimeException),因此 catch/throw 将任何异常包装到 a 中CompletionException

此外,正如@WeGa 所指出的,您可以使用该handle()方法来处理结果可能抛出的异常:

CompletableFuture<T> future = CompletableFuture.supplyAsync(...);
future.handle((ex,res) -> {
        if (ex == null) {
            // An exception occurred ...
        } else {
            // No exception was thrown, 'res' is valid and can be handled here
        }
    });
Run Code Online (Sandbox Code Playgroud)

  • 或者,如果您使用 Callable&lt;T&gt; 而不是 Runnable,请尝试 SupplyAsync: ```CompletableFuture&lt;T&gt; future = CompletableFuture.supplyAsync(myCallable, myExecutor);``` (2认同)

Dmi*_*kiy 7

我发布了一个小小的未来项目,试图比答案中的直接方式做得更好.

主要思想是使用唯一的一个线程(当然不仅仅是一个旋转循环)来检查里面的所有Futures状态,这有助于避免为每个Future - > CompletableFuture转换阻塞来自池的线程.

用法示例:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);
Run Code Online (Sandbox Code Playgroud)


Gab*_*sco 6

建议:

http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/

但是,基本上:

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}
Run Code Online (Sandbox Code Playgroud)

并且,CompletablePromise:

public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}
Run Code Online (Sandbox Code Playgroud)

例:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}
Run Code Online (Sandbox Code Playgroud)


Val*_*aev 5

让我建议另一个(希望,更好)选项:https: //github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata /同时

简而言之,这个想法如下:

  1. 介绍CompletableTask<V>接口 - CompletionStage<V>+ 的并集 RunnableFuture<V>
  2. 变形从方法ExecutorService返回(而不是)CompletableTasksubmit(...)Future<V>
  3. 完成后,我们有可运行和可组合的期货.

实现使用替代的CompletionStage实现(注意,CompletionStage而不是CompletableFuture):

用法:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c); 
Run Code Online (Sandbox Code Playgroud)

  • 小更新:代码被移动到单独的项目,https://github.com/vsilaev/tascalate-concurrent,现在可以使用来自java.util.concurrent的远程框Executor-s. (2认同)