标签: completable-future

什么是等待可完成的未来线程完成的推荐方法

CompletableFuture在代码中使用如下所示.但关于我应该等到所有可运行完成的方式,我找到了两种方法,我不知道它们之间的区别,哪一种是最佳实践?它们如下:

代码:

this.growSeedFutureList = CompletableFuture.runAsync(new GrowSeedSERun(this.saliencyMat, this.seedXY, this.seedVal), this.growSeedExecutor);
this.growSeedFutureList = CompletableFuture.runAsync(new GrowSeedNWRun(this.saliencyMat, this.seedXY, this.seedVal), this.growSeedExecutor);
this.growSeedFutureList = CompletableFuture.runAsync(new GrowSeedNERun(this.saliencyMat, this.seedXY, this.seedVal), this.growSeedExecutor);
this.growSeedFutureList = CompletableFuture.runAsync(new GrowSeedSWRun(this.saliencyMat, this.seedXY, this.seedVal), this.growSeedExecutor);
Run Code Online (Sandbox Code Playgroud)

第一种等待所有runnables完成的方法:

this.growSeedExecutor.shutdown();
this.growSeedExecutor.awaitTermination(1, TimeUnit.DAYS);
Run Code Online (Sandbox Code Playgroud)

等待所有runnables完成的第二种方法:

CompletableFuture.allOf(this.growSeedFutureList).join();
Run Code Online (Sandbox Code Playgroud)

请让我知道推荐哪一个.

java concurrency multithreading completable-future

24
推荐指数
2
解决办法
2万
查看次数

如何中断CompletableFuture的底层执行

我知道CompletableFuture设计不能通过中断来控制它的执行,但我想你们中的一些人可能会遇到这个问题.CompletableFutures是组合异步执行的非常好的方法,但考虑到你希望在取消未来时中断或停止底层执行的情况,我们该怎么做?或者我们必须接受任何取消或手动完成CompletableFuture不会影响在那里完成它的线程?

也就是说,在我看来,显然是一项无用的工作需要时间来执行工人.我想知道在这种情况下哪种方法或设计可能有用?

UPDATE

这是一个简单的测试

public class SimpleTest {

  @Test
  public void testCompletableFuture() throws Exception {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());

    bearSleep(1);

    //cf.cancel(true);
    cf.complete(null);

    System.out.println("it should die now already");
    bearSleep(7);
  }

  public static void longOperation(){
    System.out.println("started");
    bearSleep(5);
    System.out.println("completed");
  }

  private static void bearSleep(long seconds){
    try {
      TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
      System.out.println("OMG!!! Interrupt!!!");
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

java concurrency completable-future

23
推荐指数
1
解决办法
9744
查看次数

CompletableFuture:等待第一个正常返回?

我有一些CompletableFutures,我想并行运行它们,等待正常返回的第一个.

我知道我可以CompletableFuture.anyOf用来等待第一次返回,但这将正常异常返回.我想忽略异常.

List<CompletableFuture<?>> futures = names.stream().map(
  (String name) ->
    CompletableFuture.supplyAsync(
      () ->
        // this calling may throw exceptions.
        new Task(name).run()
    )
).collect(Collectors.toList());
//FIXME Can not ignore exceptionally returned takes.
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{}));
try {
    logger.info(any.get().toString());
} catch (Exception e) {
    e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)

java java-8 completable-future

21
推荐指数
2
解决办法
6736
查看次数

CompletableFuture的完成处理程序在哪个线程中执行?

我有一个关于CompletableFuture方法的问题:

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
Run Code Online (Sandbox Code Playgroud)

事情是JavaDoc说的就是这样:

返回一个新的CompletionStage,当该阶段正常完成时,将使用此阶段的结果作为所提供函数的参数执行.有关特殊完成的规则​​,请参阅CompletionStage文档.

线程怎么样?这将在哪个线程中执行?如果未来由线程池完成怎么办?

java multithreading forkjoinpool completable-future

21
推荐指数
4
解决办法
3929
查看次数

CompletableFuture,可变对象和内存可见性

我试图理解CompletableFutureJava 8 如何与Java内存模型交互.在我看来,对于程序员的理智,理想情况下应该成立:

  1. 完成任务的线程中的操作CompletableFuture - 在执行任何完成相关的阶段之前
  2. 在该线程操作注册完成创建依赖阶段发生,之前完成执行依赖阶段

java.util.concurrent文档中有一条说明:

在提交Runnable到执行之前的线程中的操作Executor - 在执行开始之前.同样的Callables提交给ExecutorService.

这表明第一个属性为true,只要完成未来的线程执行完成依赖阶段或将其提交给Executor.另一方面,在阅读CompletableFuture文档后,我不太确定:

为非异步方法的依赖完成提供的动作可以由完成当前的线程执行CompletableFuture,或者由完成方法的任何其他调用者执行.

这让我想到了我的问题:

  1. 这两个假设属性是否真实?
  2. 在使用时,是否有任何关于存在或缺乏内存可见性保证的特定文档CompletableFuture

附录:

在具体示例中,请考虑以下代码:

List<String> list1 = new ArrayList<>();
list1.add("foo");

CompletableFuture<List<String>> future =
        CompletableFuture.supplyAsync(() -> {
            List<String> list2 = new ArrayList<>();
            list2.addAll(list1);
            return list2;
        });
Run Code Online (Sandbox Code Playgroud)

能够保证所有的加入"foo"list1是可见的lambda函数?是否保证添加 …

java java.util.concurrent java-memory-model java-8 completable-future

20
推荐指数
1
解决办法
884
查看次数

'CompletionStage'和'CompletableFuture'有什么区别?

我已经在每一个中看到了一个例子,但是我需要确切地知道深层有什么不同,因为有时候我认为我可以使用它们来获得相同的结果,所以我想知道我可以选择正确的一?

使用它们各有什么好处?

像这个例子一样,两个都有效:

public CompletionStage<Result> getNextQueryUUID() {
    return CompletableFuture.supplyAsync(() -> {
        String nextId = dbRequestService.getNextRequestQueryUUID();
        return ok(nextId);
    }, executor);
}


public CompletableFuture<Result> getNextQueryUUID() {
    return CompletableFuture.supplyAsync(() -> {
        String nextId = dbRequestService.getNextRequestQueryUUID();
        return ok(nextId);
    }, executor);
}
Run Code Online (Sandbox Code Playgroud)

这个例子运行于Play framework.

java concurrency completable-future

20
推荐指数
2
解决办法
7911
查看次数

如何取消Java 8可完成的未来?

我正在玩Java 8可完成的期货.我有以下代码:

CountDownLatch waitLatch = new CountDownLatch(1);

CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    try {
        System.out.println("Wait");
        waitLatch.await(); //cancel should interrupt
        System.out.println("Done");
    } catch (InterruptedException e) {
        System.out.println("Interrupted");
        throw new RuntimeException(e);
    }
});

sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");

assertTrue(future.isCancelled());

assertTrue(future.isDone());
sleep(100); //give it some time to finish
Run Code Online (Sandbox Code Playgroud)

使用runAsync我计划执行等待锁存器的代码.接下来我取消了未来,期望被抛入中断的异常.但似乎线程在await调用上仍然被阻塞,即使未来被取消(断言传递),也不会抛出InterruptedException.使用ExecutorService的等效代码按预期工作.它是CompletableFuture中的错误还是我的示例中的错误?

java multithreading java-8 completable-future

17
推荐指数
3
解决办法
1万
查看次数

传递给CompletableFuture.exceptionally()的异常处理程序是否必须返回有意义的值?

我习惯了ListenableFuture模式,有onSuccess()onFailure()回调,例如

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> future = service.submit(...)
Futures.addCallback(future, new FutureCallback<String>() {
  public void onSuccess(String result) {
    handleResult(result);
  }
  public void onFailure(Throwable t) {
    log.error("Unexpected error", t);
  }
})
Run Code Online (Sandbox Code Playgroud)

看起来Java 8的CompletableFuture目的是处理或多或少相同的用例.天真的,我可以开始将上面的例子翻译为:

CompletableFuture<String> future = CompletableFuture<String>.supplyAsync(...)
  .thenAccept(this::handleResult)
  .exceptionally((t) -> log.error("Unexpected error", t));
Run Code Online (Sandbox Code Playgroud)

这肯定不如ListenableFuture版本那么冗长,看起来非常有前景.

但是,它没有编译,因为exceptionally()不需要a Consumer<Throwable>,它需要一个Function<Throwable, ? extends T>- 在这种情况下,a Function<Throwable, ? extends String>.

这意味着我不能只记录错误,我必须String在错误情况下提出一个返回值,并且在错误情况下没有有意义的String值返回.我可以返回null,只是为了获得编译代码:

  .exceptionally((t) -> {
    log.error("Unexpected …
Run Code Online (Sandbox Code Playgroud)

java completable-future

17
推荐指数
2
解决办法
1万
查看次数

将CompletableFuture <Stream <T >>转换为Publisher <T>是否正确?

允许对结果流进行多次迭代,CompletableFuture<Stream<String>>我正在考虑以下方法之一:

  1. 将生成的未来转换为CompletableFuture<List<String>>:teams.thenApply(st -> st.collect(toList()))

  2. 将生成的未来转换为Flux<String>缓存:Flux.fromStream(teams::join).cache();

Flux<T>Publisher<T>项目反应堆的实施.

使用案例:

我想Stream<String>从一个数据源获得一个具有顶级联赛球队名称的序列(例如),该数据源提供一个League对象Standing[](基于足球数据RESTful API,例如http://api.football-data.org/v1/ soccerseasons/445/leagueTable).使用AsyncHttpClientGson我们有:

CompletableFuture<Stream<String>> teams = asyncHttpClient
    .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(body -> gson.fromJson(body, League.class));
    .thenApply(l -> stream(l.standings).map(s -> s.teamName));
Run Code Online (Sandbox Code Playgroud)

要重新使用生成的流,我有两个选择:

1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))

2. Flux<String> res = Flux.fromStream(teams::join).cache()
Run Code Online (Sandbox Code Playgroud)

Flux<T>不那么冗长,并提供我所需要的一切.然而,在这种情况下使用它是否正确?

或者我应该使用CompletableFuture<List<String>>?或者还有其他更好的选择吗?

更新了一些想法(2018-03-16):

CompletableFuture<List<String>>:

  • [PROS] List<String>将继续收集,当我们需要继续处理未来的结果时,可能已经完成了.
  • [CONS]宣言冗长.
  • [CONS]如果我们只想使用它一次,那么我们就不需要收集那些物品了List<T>.

Flux<String> …

java java-8 rx-java project-reactor completable-future

17
推荐指数
1
解决办法
2030
查看次数

什么是CompletableFuture相当于flatMap?

我有这种奇怪的类型,CompletableFuture<CompletableFuture<byte[]>>但我想要CompletableFuture<byte[]>.这可能吗?

public Future<byte[]> convert(byte[] htmlBytes) {
    PhantomPdfMessage htmlMessage = new PhantomPdfMessage();
    htmlMessage.setId(UUID.randomUUID());
    htmlMessage.setTimestamp(new Date());
    htmlMessage.setEncodedContent(Base64.getEncoder().encodeToString(htmlBytes));

    CompletableFuture<CompletableFuture<byte[]>> thenApply = CompletableFuture.supplyAsync(this::getPhantom, threadPool).thenApply(
        worker -> worker.convert(htmlMessage).thenApply(
            pdfMessage -> Base64.getDecoder().decode(pdfMessage.getEncodedContent())
        )
    );

}
Run Code Online (Sandbox Code Playgroud)

java java-8 completable-future

16
推荐指数
1
解决办法
3662
查看次数