CompletableFuture in循环:如何收集所有响应并处理错误

Rud*_*ira 5 java multithreading asynchronous java-8 completable-future

我试图PUT在循环中调用rest api for request.每个电话都是一个CompletableFuture.每个api调用都返回一个类型的对象RoomTypes.RoomType

  • 我想在不同的列表中收集响应(成功和错误响应).我如何实现这一目标?我确信我无法使用,allOf因为如果任何一个调用无法更新,它将无法获得所有结果.

  • 如何记录每次通话的错误/异常?


public void sendRequestsAsync(Map<Integer, List> map1) {
    List<CompletableFuture<Void>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
    List<RoomTypes.RoomType> responses = new ArrayList<>(); //List for responses
    ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    for (Map.Entry<Integer, List> entry :map1.entrySet()) { 
        CompletableFuture requestCompletableFuture = CompletableFuture
                .supplyAsync(
                        () -> 
            //API call which returns object of type RoomTypes.RoomType
            updateService.updateRoom(51,33,759,entry.getKey(),
                           new RoomTypes.RoomType(entry.getKey(),map2.get(entry.getKey()),
                                    entry.getValue())),
                    yourOwnExecutor
            )//Supply the task you wanna run, in your case http request
            .thenApply(responses::add);

    completableFutures.add(requestCompletableFuture);
}
Run Code Online (Sandbox Code Playgroud)

Did*_*r L 8

您可以简单地使用allOf()以获得在所有初始期货完成时完成的未来(特殊情况或未完成),然后使用Collectors.partitioningBy()以下方法在成功和失败之间拆分:

List<CompletableFuture<RoomTypes.RoomType>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

for (Map.Entry<Integer, List> entry : map1.entrySet()) {
    CompletableFuture<RoomTypes.RoomType> requestCompletableFuture = CompletableFuture
            .supplyAsync(
                    () ->
                //API call which returns object of type RoomTypes.RoomType
                updateService.updateRoom(51, 33, 759, entry.getKey(),
                        new RoomTypes.RoomType(entry.getKey(), map2.get(entry.getKey()),
                                entry.getValue())),
                    yourOwnExecutor
            );

    completableFutures.add(requestCompletableFuture);
}

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
        // avoid throwing an exception in the join() call
        .exceptionally(ex -> null)
        .join();
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result =
        completableFutures.stream()
                .collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally)));
Run Code Online (Sandbox Code Playgroud)

生成的映射将包含一个true用于失败的期货的条目,以及另一个带有false成功的密钥的条目.然后,您可以检查2个条目以采取相应措施.

请注意,与原始代码相比,有2处略有变化:

  • requestCompletableFuture 现在是一个 CompletableFuture<RoomTypes.RoomType>
  • thenApply(responses::add)responses删除了该列表

关于日志记录/异常处理,只需添加相关内容requestCompletableFuture.handle()即可单独记录它们,但保留requestCompletableFuture而不是由此产生的handle().

  • 当他们的 `result` 类型与 `collect` 返回的内容不完全匹配时,我已经看到了它。尝试删除对 `result` 的赋值,并使用您的 IDE 再次从整个表达式中提取局部变量。 (3认同)
  • 我在“CompletableFuture::isCompletedExceptionally”处得到“非静态方法无法通过静态上下文访问”。但我的方法不是静态的。 (2认同)

Edw*_*rzo 6

另外,也许您可​​以从另一个角度解决问题,而不是强制使用CompletableFuture,而可以使用CompletionService

的整个想法CompletionService是,一旦为给定的未来找到了答案,它就会被放入队列中,您可以从中使用结果。

备选方案1:没有CompletableFuture

CompletionService<String> cs = new ExecutorCompletionService<>(executor);

List<Future<String>> futures = new ArrayList<>();

futures.add(cs.submit(() -> "One"));
futures.add(cs.submit(() -> "Two"));
futures.add(cs.submit(() -> "Three"));
futures.add(cs.submit(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));


List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
    Future<String> f = cs.poll();
    if (f != null) {
        futures.remove(f);
        try {
            //at this point the future is guaranteed to be solved
            //so there won't be any blocking here
            String value = f.get();
            successes.add(value);
        } catch (Exception e) {
            failures.add(e.getMessage());
        }
    }
}

System.out.println(successes); 
System.out.println(failures);
Run Code Online (Sandbox Code Playgroud)

产生:

[One, Two, Three, Five]
[java.lang.RuntimeException: Sucks to be four]
Run Code Online (Sandbox Code Playgroud)

备选方案2:具有CompletableFuture

但是,如果您确实需要处理,CompletableFuture也可以将它们直接提交到完成服务中,只需将它们直接放入队列中即可:

例如,以下变化具有相同的结果:

BlockingQueue<Future<String>> tasks = new ArrayBlockingQueue<>(5);
CompletionService<String> cs = new ExecutorCompletionService<>(executor, tasks);

List<Future<String>> futures = new ArrayList<>();

futures.add(CompletableFuture.supplyAsync(() -> "One"));
futures.add(CompletableFuture.supplyAsync(() -> "Two"));
futures.add(CompletableFuture.supplyAsync(() -> "Three"));
futures.add(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));

//places all futures in completion service queue
tasks.addAll(futures);

List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
    Future<String> f = cs.poll();
    if (f != null) {
        futures.remove(f);
        try {
            //at this point the future is guaranteed to be solved
            //so there won't be any blocking here
            String value = f.get();
            successes.add(value);
        } catch (Exception e) {
            failures.add(e.getMessage());
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


小智 5

用于需要使用For循环的地方。这是一个可行的解决方案CompletableFuture.allOf() ->

\n

您想要下载某个网站 100 个不同网页的内容。您可以按顺序执行此操作,但这会花费很多时间。因此,可以编写一个接受网页链接并返回 CompletableFuture 的函数:

\n
CompletableFuture<String> downloadWebPage(String pageLink) {\nreturn CompletableFuture.supplyAsync(() -> {\n    // Code to download and return the web page's content\n});\n} \n
Run Code Online (Sandbox Code Playgroud)\n

循环调用前一个函数,我们使用的是JAVA 8

\n
List<String> webPageLinks = Arrays.asList(...)  // A list of 100 web page links\n\n// Download contents of all the web pages asynchronously\nList<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()\n    .map(webPageLink -> downloadWebPage(webPageLink))\n    .collect(Collectors.toList());\n\n\n// Create a combined Future using allOf()\nCompletableFuture<Void> allFutures = CompletableFuture.allOf(\n    pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])\n);\n
Run Code Online (Sandbox Code Playgroud)\n

CompletableFuture.allOf() 的问题是它返回 CompletableFuture。但是我们可以通过编写几行额外的代码来获取所有包装的 CompletableFutures 的结果

\n
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -\nCompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {\nreturn pageContentFutures.stream()\n       .map(pageContentFuture -> pageContentFuture.join())\n       .collect(Collectors.toList());\n});\n
Run Code Online (Sandbox Code Playgroud)\n

现在让\xe2\x80\x99s 计算包含我们的关键字的网页数量 ->

\n
// Count the number of web pages having the "CompletableFuture" keyword.\nCompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {\n return pageContents.stream()\n        .filter(pageContent -> pageContent.contains("CompletableFuture"))\n        .count();\n});\n\nSystem.out.println("Number of Web Pages having CompletableFuture keyword - " + \n    countFuture.get());\n
Run Code Online (Sandbox Code Playgroud)\n