我CompletableFuture在代码中使用如下所示.但关于我应该等到所有可运行完成的方式,我找到了两种方法,我不知道它们之间的区别,哪一种是最佳实践?它们如下:
代码:
this.growSeedFutureList = CompletableFuture.runAsync(new GrowSeedSERun(this.saliencyMat, this.seedXY, this.seedVal), this.growSeedExecutor);
this.growSeedFutureList = CompletableFuture.runAsync(new GrowSeedNWRun(this.saliencyMat, this.seedXY, this.seedVal), this.growSeedExecutor);
this.growSeedFutureList = CompletableFuture.runAsync(new GrowSeedNERun(this.saliencyMat, this.seedXY, this.seedVal), this.growSeedExecutor);
this.growSeedFutureList = CompletableFuture.runAsync(new GrowSeedSWRun(this.saliencyMat, this.seedXY, this.seedVal), this.growSeedExecutor);
Run Code Online (Sandbox Code Playgroud)
第一种等待所有runnables完成的方法:
this.growSeedExecutor.shutdown();
this.growSeedExecutor.awaitTermination(1, TimeUnit.DAYS);
Run Code Online (Sandbox Code Playgroud)
等待所有runnables完成的第二种方法:
CompletableFuture.allOf(this.growSeedFutureList).join();
Run Code Online (Sandbox Code Playgroud)
请让我知道推荐哪一个.
我知道CompletableFuture设计不能通过中断来控制它的执行,但我想你们中的一些人可能会遇到这个问题.CompletableFutures是组合异步执行的非常好的方法,但考虑到你希望在取消未来时中断或停止底层执行的情况,我们该怎么做?或者我们必须接受任何取消或手动完成CompletableFuture不会影响在那里完成它的线程?
也就是说,在我看来,显然是一项无用的工作需要时间来执行工人.我想知道在这种情况下哪种方法或设计可能有用?
UPDATE
这是一个简单的测试
public class SimpleTest {
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());
bearSleep(1);
//cf.cancel(true);
cf.complete(null);
System.out.println("it should die now already");
bearSleep(7);
}
public static void longOperation(){
System.out.println("started");
bearSleep(5);
System.out.println("completed");
}
private static void bearSleep(long seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
System.out.println("OMG!!! Interrupt!!!");
}
}
}
Run Code Online (Sandbox Code Playgroud) 我有一些CompletableFutures,我想并行运行它们,等待正常返回的第一个.
我知道我可以CompletableFuture.anyOf用来等待第一次返回,但这将正常或异常返回.我想忽略异常.
List<CompletableFuture<?>> futures = names.stream().map(
(String name) ->
CompletableFuture.supplyAsync(
() ->
// this calling may throw exceptions.
new Task(name).run()
)
).collect(Collectors.toList());
//FIXME Can not ignore exceptionally returned takes.
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{}));
try {
logger.info(any.get().toString());
} catch (Exception e) {
e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud) 我有一个关于CompletableFuture方法的问题:
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
Run Code Online (Sandbox Code Playgroud)
事情是JavaDoc说的就是这样:
返回一个新的CompletionStage,当该阶段正常完成时,将使用此阶段的结果作为所提供函数的参数执行.有关特殊完成的规则,请参阅CompletionStage文档.
线程怎么样?这将在哪个线程中执行?如果未来由线程池完成怎么办?
我试图理解CompletableFutureJava 8 如何与Java内存模型交互.在我看来,对于程序员的理智,理想情况下应该成立:
CompletableFuture - 在执行任何java.util.concurrent文档中有一条说明:
在提交
Runnable到执行之前的线程中的操作Executor- 在执行开始之前.同样的Callables提交给ExecutorService.
这表明第一个属性为true,只要完成未来的线程执行完成依赖阶段或将其提交给Executor.另一方面,在阅读CompletableFuture文档后,我不太确定:
为非异步方法的依赖完成提供的动作可以由完成当前的线程执行
CompletableFuture,或者由完成方法的任何其他调用者执行.
这让我想到了我的问题:
CompletableFuture?附录:
在具体示例中,请考虑以下代码:
List<String> list1 = new ArrayList<>();
list1.add("foo");
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(() -> {
List<String> list2 = new ArrayList<>();
list2.addAll(list1);
return list2;
});
Run Code Online (Sandbox Code Playgroud)
能够保证所有的加入"foo"到list1是可见的lambda函数?是否保证添加 …
java java.util.concurrent java-memory-model java-8 completable-future
我已经在每一个中看到了一个例子,但是我需要确切地知道深层有什么不同,因为有时候我认为我可以使用它们来获得相同的结果,所以我想知道我可以选择正确的一?
使用它们各有什么好处?
像这个例子一样,两个都有效:
public CompletionStage<Result> getNextQueryUUID() {
return CompletableFuture.supplyAsync(() -> {
String nextId = dbRequestService.getNextRequestQueryUUID();
return ok(nextId);
}, executor);
}
public CompletableFuture<Result> getNextQueryUUID() {
return CompletableFuture.supplyAsync(() -> {
String nextId = dbRequestService.getNextRequestQueryUUID();
return ok(nextId);
}, executor);
}
Run Code Online (Sandbox Code Playgroud)
这个例子运行于
Play framework.
我正在玩Java 8可完成的期货.我有以下代码:
CountDownLatch waitLatch = new CountDownLatch(1);
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
try {
System.out.println("Wait");
waitLatch.await(); //cancel should interrupt
System.out.println("Done");
} catch (InterruptedException e) {
System.out.println("Interrupted");
throw new RuntimeException(e);
}
});
sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");
assertTrue(future.isCancelled());
assertTrue(future.isDone());
sleep(100); //give it some time to finish
Run Code Online (Sandbox Code Playgroud)
使用runAsync我计划执行等待锁存器的代码.接下来我取消了未来,期望被抛入中断的异常.但似乎线程在await调用上仍然被阻塞,即使未来被取消(断言传递),也不会抛出InterruptedException.使用ExecutorService的等效代码按预期工作.它是CompletableFuture中的错误还是我的示例中的错误?
我习惯了ListenableFuture模式,有onSuccess()和onFailure()回调,例如
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> future = service.submit(...)
Futures.addCallback(future, new FutureCallback<String>() {
public void onSuccess(String result) {
handleResult(result);
}
public void onFailure(Throwable t) {
log.error("Unexpected error", t);
}
})
Run Code Online (Sandbox Code Playgroud)
看起来Java 8的CompletableFuture目的是处理或多或少相同的用例.天真的,我可以开始将上面的例子翻译为:
CompletableFuture<String> future = CompletableFuture<String>.supplyAsync(...)
.thenAccept(this::handleResult)
.exceptionally((t) -> log.error("Unexpected error", t));
Run Code Online (Sandbox Code Playgroud)
这肯定不如ListenableFuture版本那么冗长,看起来非常有前景.
但是,它没有编译,因为exceptionally()不需要a Consumer<Throwable>,它需要一个Function<Throwable, ? extends T>- 在这种情况下,a Function<Throwable, ? extends String>.
这意味着我不能只记录错误,我必须String在错误情况下提出一个返回值,并且在错误情况下没有有意义的String值返回.我可以返回null,只是为了获得编译代码:
.exceptionally((t) -> {
log.error("Unexpected …Run Code Online (Sandbox Code Playgroud) 允许对结果流进行多次迭代,CompletableFuture<Stream<String>>我正在考虑以下方法之一:
将生成的未来转换为CompletableFuture<List<String>>:teams.thenApply(st -> st.collect(toList()))
将生成的未来转换为Flux<String>缓存:Flux.fromStream(teams::join).cache();
Flux<T>是Publisher<T>项目反应堆的实施.
使用案例:
我想Stream<String>从一个数据源获得一个具有顶级联赛球队名称的序列(例如),该数据源提供一个League对象Standing[](基于足球数据RESTful API,例如http://api.football-data.org/v1/ soccerseasons/445/leagueTable).使用AsyncHttpClient和Gson我们有:
CompletableFuture<Stream<String>> teams = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(body -> gson.fromJson(body, League.class));
.thenApply(l -> stream(l.standings).map(s -> s.teamName));
Run Code Online (Sandbox Code Playgroud)
要重新使用生成的流,我有两个选择:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))
2. Flux<String> res = Flux.fromStream(teams::join).cache()
Run Code Online (Sandbox Code Playgroud)
Flux<T>不那么冗长,并提供我所需要的一切.然而,在这种情况下使用它是否正确?
或者我应该使用CompletableFuture<List<String>>?或者还有其他更好的选择吗?
更新了一些想法(2018-03-16):
CompletableFuture<List<String>>:
List<String>将继续收集,当我们需要继续处理未来的结果时,可能已经完成了.List<T>.Flux<String> …
我有这种奇怪的类型,CompletableFuture<CompletableFuture<byte[]>>但我想要CompletableFuture<byte[]>.这可能吗?
public Future<byte[]> convert(byte[] htmlBytes) {
PhantomPdfMessage htmlMessage = new PhantomPdfMessage();
htmlMessage.setId(UUID.randomUUID());
htmlMessage.setTimestamp(new Date());
htmlMessage.setEncodedContent(Base64.getEncoder().encodeToString(htmlBytes));
CompletableFuture<CompletableFuture<byte[]>> thenApply = CompletableFuture.supplyAsync(this::getPhantom, threadPool).thenApply(
worker -> worker.convert(htmlMessage).thenApply(
pdfMessage -> Base64.getDecoder().decode(pdfMessage.getEncodedContent())
)
);
}
Run Code Online (Sandbox Code Playgroud)