标签: completable-future

Java 8 使用 CompletableFuture::join 维护流顺序

我有一个异步执行的查询输入流。我想确保当我使用时Completablefuture::join,这些要求的结果是按照输入查询流的顺序收集的。

这是我的代码的样子:

queries.stream()
     .map(query -> CompletableFuture.supplyAsync(() -> {
                    try {
                        return SQLQueryEngine.execute(query);
                    } catch (InternalErrorException e) {
                        throw new RuntimeException(e);
                    }
     }))
     .map(CompletableFuture::join)
     .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

SQLQueryEngine.execute(查询); 返回一个List<Results>so 输出是List<List<Result>。我想展平并将所有结果合并到一个列表中。如果我在收集之前使用 .flatMap(List::stream) 来扁平化,它会保持排序吗?

java java-8 java-stream completable-future

3
推荐指数
1
解决办法
2582
查看次数

Guice 在执行 CompletableFuture 时抛出 OutOfScopeException

从请求范围的线程中,CompletableFuture必须由执行器中运行的任务来完成。所提供的供应商使用会话范围内的特定于域的服务MessageService。该服务是由 Guice 注入的。

public class MessageProcessingPage {
    private MessageService messageService;

    @Inject
    public MessagProcessingPage (MessageService messageService) {
        this.messageService = messageService;
    }

    // Called by request scoped thread.
    public void onProcessMessagesButton () {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(
        // Called from a thread from the threadpool.
        () -> {return messageService.retrieveMessageMetadataSet(x, y);}
        , executorService);

        ...

    }

    ...
}
Run Code Online (Sandbox Code Playgroud)

有一个被注入的MessageService(会话范围) 。MessageRestClient

@SessionScoped
public class MessageService {
    private MessageRestClient messageRestClient;

    @Inject
    public MessageRestClient (MessageRestClient messageRestClient) {
        this.messageRestClient = …
Run Code Online (Sandbox Code Playgroud)

java guice completable-future

3
推荐指数
1
解决办法
5411
查看次数

组合多个CompletableFutures

我有以下组成部分:

private JobInfo aggregateJobInfo() {
    final JobsResult jobsResult = restClient().getJobs();
    final List<String> jobIds = extractJobIds(jobsResult);

    //fetch details, exceptions and config for each job
    final List<JobDetails> jobDetails = jobIds.stream().map(jobId -> {
        final JobDetailResult jobDetailResult = restClient().getJobDetails(jobId);
        final JobExceptionsResult jobExceptionsResult = restClient().getJobExceptions(jobId);
        final JobConfigResult jobConfigResult = restClient().getJobConfig(jobId);
        return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult);
    }).collect(Collectors.toList());
    return new JobInfo(jobsResult, jobDetails);
}

private static List<String> extractJobIds(final JobsResult jobsResult) {
    final ArrayList<String> jobIds = new ArrayList<>();
    jobIds.addAll(jobsResult.getRunning());
    jobIds.addAll(jobsResult.getFinished());
    jobIds.addAll(jobsResult.getCanceled());
    jobIds.addAll(jobsResult.getFailed());
    return jobIds;
}
Run Code Online (Sandbox Code Playgroud)

它只是调用一些ENDPOINTS并修饰一些数据。现在,我试图通过使用CompletableFutures来实现这种非阻塞性,而我之前从未真正使用过。

private CompletableFuture<JobInfo> aggregateJobInfo() …
Run Code Online (Sandbox Code Playgroud)

java-8 completable-future

3
推荐指数
1
解决办法
8256
查看次数

如何在java 8中使用CompletableFuture启动异步任务并让主线程完成并退出

我有以下代码(或多或少):

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture
    .supplyAsync(()->{
        return longRunningMethodThatReturnsBoolean();
    }, executor)
    .thenAcceptAsync(taskResult -> {
        logResult();
        executor.shutdown(); 
    }, executor);
Run Code Online (Sandbox Code Playgroud)

这允许主线程中的代码继续,但是我希望主线程在完成时死亡,并且未来继续在它自己的线程中工作,但是主线程保持活动状态直到 CompletableFuture 完成,即使主线程没有不再做任何事情。

我对这个有点陌生,我错过了什么吗?甚至有可能吗?

任何帮助将不胜感激!!!

java multithreading completable-future

3
推荐指数
2
解决办法
2365
查看次数

如何在 Vertx 事件循环线程上运行 CompletableFuture 处理程序?

我有一个库 xyz,它为我提供了一个 CompletableFuture,我想在我的 Vertx (v3.5) 事件循环中处理它。目前我正在使用CompletableFuture.handle(BiFunction)(见下文),但我想使用CompletableFuture.handleAsync(BiFunction, Executor),但我无法弄清楚如何在此方法调用中向第二个参数提供 vertx 事件循环线程.

我尝试在Vertx.runOnContext() 中执行整个代码,但 handleAsync 内部的调用仍然在 Java 的 ForkJoin 池上执行,我想避免这种情况。

CompletableFuture<Void> f = xyz.someMethod();
f.handle((v, th) -> { //Want to run this in handleAsync()
   if (th == null) {
     future.complete(null);
   } else {
     future.completeExceptionally(th);
   }
   return null;
});
Run Code Online (Sandbox Code Playgroud)

java reactive-programming java-8 vert.x completable-future

3
推荐指数
1
解决办法
2730
查看次数

"Java 8 in Action"对它提供的演示有误吗?

此代码引自Java 8 in Action,也出现在本书11.4.3中.

public Stream<CompletableFuture<String>> findPricesStream(String product) {
    return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
Run Code Online (Sandbox Code Playgroud)

沿着代码,编写器附上如下,表示applyDiscount()同一个线程中的工作getPrice(),我强烈怀疑:这里有两个不同的Async后缀,这意味着第二个调用应该在另一个线程中.

在此输入图像描述

我使用以下代码在本地测试它:

private static void testBasic() {
    out.println("*****************************************");
    out.println("********** TESTING thenCompose **********");
    CompletableFuture[] futures = IntStream.rangeClosed(0, LEN).boxed()
            .map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), EXECUTOR_SERVICE))
            .map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> runStage2(i), EXECUTOR_SERVICE)))
            .toArray(size -> new CompletableFuture[size]);
    CompletableFuture.allOf(futures).join();
}
Run Code Online (Sandbox Code Playgroud)

输出进一步证明了我的想法,这是正确的吗?

***************************************** …
Run Code Online (Sandbox Code Playgroud)

java asynchronous java-8 completable-future

3
推荐指数
1
解决办法
106
查看次数

有没有办法延迟可完成未来的执行?

我将使用 CompletableFuture 进行异步调用。我想知道是否有办法延迟其执行。我对一个流有大约 5 个异步调用,并且我正在链接,然后根据需要使用 thenApply/thenCompose。我的问题是当我为调用创建第一个 CF 时,只要线程空闲,它就会开始执行。我希望首先我的所有任务都应该被链接起来,然后当我调用,说完成()时,它开始执行。我正在寻找的是类似于 Java 流中的中间操作。在我之前关于 SO 的一个问题中,我对此有一些帮助,但这并没有解决我的目的。

我的技术栈只允许 Java 8,所以不能使用下一个版本中推出的任何功能。

java asynchronous java-stream completable-future

3
推荐指数
1
解决办法
2447
查看次数

CompletableFuture.allOf 抛出异常时取消其他期货

当所有给定的期货完成时,Java 8CompletableFuture.allOf(CompletableFuture<?>...cfs)将返回一个CompletableFuture完成的,或者CompletionException如果其中一个期货完成并出现异常则抛出一个。

如果我的一个期货异常完成,会CompletableFuture.allOf等待剩余的期货完成后再抛出CompletionException还是会取消剩余的期货?

如果它等待所有期货完成,有什么办法让它在任何期货抛出异常时立即返回并取消剩余的期货?

java multithreading java-8 completable-future

3
推荐指数
1
解决办法
1553
查看次数

CompletableFuture - 并行运行多个 rest 调用并获得不同的结果

我有一个相当普遍或独特的要求。例如,我有以下AccountDetails列表:

List<AccountDetails>

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;   
}
Run Code Online (Sandbox Code Playgroud)

上述所有字段,除了bankAccountId从外部 REST 服务调用中提取的。我想并行调用所有 REST 服务并更新列表中的每个对象:

所以,它看起来像下面:

对于每个 accountDetails

  • 调用抵押 REST 服务并更新martgageAccountId字段(REST 返回 MortgageInfo 对象)
  • 调用事务 REST 服务并更新noOfTrans字段(REST 返回Transactions对象)
  • 调用地址 REST 服务和更新addressLine字段(REST 返回Address对象)
  • 调用链接 REST 服务并更新externalLink字段。(REST 返回Links对象)

我希望并行执行上述所有调用,并且针对AcccountDetails列表中的每个对象。如果有异常,我想优雅地处理它。请注意,上述每个 REST 服务都返回不同的自定义对象

我对如何通过CompletableFuture链接实现这一点感到困惑。不确定allOfthenCombine(只需要两个),或者thenCompose应该使用以及如何将所有这些放在一起。

任何例子/想法?

java multithreading java-8 spring-boot completable-future

3
推荐指数
2
解决办法
1万
查看次数

如果链中的第一个方法是异步的,那么一系列方法调用 (CompletableFuture API) 是否会异步执行?

我正在学习 CompletableFuture API,有一个例子:

CompletableFuture.completedFuture(url)
                 .thenComposeAsync(this::readPage, executor)
                 .thenApply(this::getImageURLs)
                 .thenApply(this::saveFoundImages)
                 .....
Run Code Online (Sandbox Code Playgroud)

我有一个问题:如果我将thenComposeAsync(...)方法作为第一个调用,链中的其他方法会在executor我通过参数传递的方法中执行,还是应该使用async调用其他方法以在特定执行程序中异步执行?

java concurrency multithreading asynchronous completable-future

3
推荐指数
1
解决办法
262
查看次数