Dan*_*ood 79 java future java-8
Java 8引入CompletableFuture了一个可组合的Future的新实现(包括一堆thenXxx方法).我想独占使用它,但我想使用的许多库只返回不可组合的Future实例.
有没有办法将返回的Future实例包装在一个内部,CompleteableFuture以便我可以编写它?
nos*_*sid 51
有一种方法,但你不会喜欢它.以下方法将a Future<T>转换为CompletableFuture<T>:
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
Run Code Online (Sandbox Code Playgroud)
显然,这种方法的问题在于,对于每个Future,线程将被阻塞以等待Future的结果 - 与未来的想法相对立.在某些情况下,可能会做得更好.但是,一般来说,没有积极等待未来结果就没有解决方案.
Kaf*_*que 48
如果您要使用的库除了Future样式之外还提供回调样式方法,您可以为它提供一个处理程序来完成CompletableFuture而不会有任何额外的线程阻塞.像这样:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
// ...
CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
completableFuture.complete(buffer);
}
@Override
public void failed(Throwable exc, Void attachment) {
completableFuture.completeExceptionally(exc);
}
});
completableFuture.thenApply(...)
Run Code Online (Sandbox Code Playgroud)
如果没有回调,我看到解决这个问题的另一种方法是使用一个轮询循环,将所有Future.isDone()检查放在一个线程上,然后在Future可获取时调用complete.
Mat*_*ieu 33
如果您Future是调用ExecutorService方法(例如submit())的结果,最简单的CompletableFuture.runAsync(Runnable, Executor)方法是改用该方法。
从
Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);
Run Code Online (Sandbox Code Playgroud)
到
Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);
Run Code Online (Sandbox Code Playgroud)
该CompletableFuture则创建“原生地”。
编辑:通过@MartinAndersson 更正@SamMefford 的评论,如果您想传递 a Callable,则需要调用supplyAsync(),将 the 转换Callable<T>为 a Supplier<T>,例如:
CompletableFuture.supplyAsync(() -> {
try { return myCallable.call(); }
catch (Exception ex) { throw new CompletionException(ex); } // Or return default value
}, myExecutor);
Run Code Online (Sandbox Code Playgroud)
因为T Callable.call() throws Exception;抛出异常而T Supplier.get();不会,所以您必须捕获异常以便原型兼容。
该get()方法未指定 a throws,这意味着它不应抛出已检查的异常。但是,可以使用未经检查的异常。中的代码CompletableFuture显示CompletionException已使用且未检查(即是 a RuntimeException),因此 catch/throw 将任何异常包装到 a 中CompletionException。
此外,正如@WeGa 所指出的,您可以使用该handle()方法来处理结果可能抛出的异常:
CompletableFuture<T> future = CompletableFuture.supplyAsync(...);
future.handle((ex,res) -> {
if (ex == null) {
// An exception occurred ...
} else {
// No exception was thrown, 'res' is valid and can be handled here
}
});
Run Code Online (Sandbox Code Playgroud)
我发布了一个小小的未来项目,试图比答案中的直接方式做得更好.
主要思想是使用唯一的一个线程(当然不仅仅是一个旋转循环)来检查里面的所有Futures状态,这有助于避免为每个Future - > CompletableFuture转换阻塞来自池的线程.
用法示例:
Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);
Run Code Online (Sandbox Code Playgroud)
建议:
http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/
但是,基本上:
public class CompletablePromiseContext {
private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();
public static void schedule(Runnable r) {
SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
}
}
Run Code Online (Sandbox Code Playgroud)
并且,CompletablePromise:
public class CompletablePromise<V> extends CompletableFuture<V> {
private Future<V> future;
public CompletablePromise(Future<V> future) {
this.future = future;
CompletablePromiseContext.schedule(this::tryToComplete);
}
private void tryToComplete() {
if (future.isDone()) {
try {
complete(future.get());
} catch (InterruptedException e) {
completeExceptionally(e);
} catch (ExecutionException e) {
completeExceptionally(e.getCause());
}
return;
}
if (future.isCancelled()) {
cancel(true);
return;
}
CompletablePromiseContext.schedule(this::tryToComplete);
}
}
Run Code Online (Sandbox Code Playgroud)
例:
public class Main {
public static void main(String[] args) {
final ExecutorService service = Executors.newSingleThreadExecutor();
final Future<String> stringFuture = service.submit(() -> "success");
final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);
completableFuture.whenComplete((result, failure) -> {
System.out.println(result);
});
}
}
Run Code Online (Sandbox Code Playgroud)
让我建议另一个(希望,更好)选项:https: //github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata /同时
简而言之,这个想法如下:
CompletableTask<V>接口 - CompletionStage<V>+ 的并集
RunnableFuture<V>ExecutorService返回(而不是)CompletableTasksubmit(...)Future<V>实现使用替代的CompletionStage实现(注意,CompletionStage而不是CompletableFuture):
用法:
J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
.submit( someCallableA )
.thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
.thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
34894 次 |
| 最近记录: |