CompletableFuture - 并行运行多个 rest 调用并获得不同的结果

Kev*_*ave 3 java multithreading java-8 spring-boot completable-future

我有一个相当普遍或独特的要求。例如,我有以下AccountDetails列表:

List<AccountDetails>

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;   
}
Run Code Online (Sandbox Code Playgroud)

上述所有字段,除了bankAccountId从外部 REST 服务调用中提取的。我想并行调用所有 REST 服务并更新列表中的每个对象:

所以,它看起来像下面:

对于每个 accountDetails

  • 调用抵押 REST 服务并更新martgageAccountId字段(REST 返回 MortgageInfo 对象)
  • 调用事务 REST 服务并更新noOfTrans字段(REST 返回Transactions对象)
  • 调用地址 REST 服务和更新addressLine字段(REST 返回Address对象)
  • 调用链接 REST 服务并更新externalLink字段。(REST 返回Links对象)

我希望并行执行上述所有调用,并且针对AcccountDetails列表中的每个对象。如果有异常,我想优雅地处理它。请注意,上述每个 REST 服务都返回不同的自定义对象

我对如何通过CompletableFuture链接实现这一点感到困惑。不确定allOfthenCombine(只需要两个),或者thenCompose应该使用以及如何将所有这些放在一起。

任何例子/想法?

The*_*nce 6

AccountDetails accountDetails = new AccountDetails();

CompletableFuture.allOf(
                        CompletableFuture.
                                supplyAsync(() -> //CALL MORTAGE INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setMortgageAccountId(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setNoOfTrans(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setAddressLine(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setExternalLink(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                ).join();
Run Code Online (Sandbox Code Playgroud)


Min*_*ang 6

如果我简单地将您的帐户类别设置为:

class Account {
  String fieldA;
  String fieldB;
  String fieldC;

  Account(String fieldA, String fieldB, String fieldC) {
    this.fieldA = fieldA;
    this.fieldB = fieldB;
    this.fieldC = fieldC;
  }
}
Run Code Online (Sandbox Code Playgroud)

然后,您可以用来CompletableFuture#allOf(...)等待所有可完成的 future 的结果,每个字段更新一个,然后分别从这些 future 中检索结果。我们不能使用 的结果,allOf因为它什么也不返回(void)。

Account account = CompletableFuture.allOf(cfA, cfB, cfC)
    .thenApply(ignored -> {
      String a = cfA.join();
      String b = cfB.join();
      String c = cfC.join();
      return new Account(a, b, c);
    }).join(); // or get(...) with timeout
Run Code Online (Sandbox Code Playgroud)

我们可以在 the 中使用 join,thenApply因为所有可完成的 future 都在这个阶段完成。您可以修改上面的代码块以适应您的逻辑,例如更新字段而不是创建新对象。请注意,join()当可完成的未来异常完成时,上述情况可能会引发异常。您可以handle()在将 future 提交给 之前正确更改您的可完成的 future,或者在使用之前allOf(...)询问是否可以:isCompletedExceptionally()join()

CompletableFuture.allOf(cfA, cfB, cfC)
    .thenRun(() -> {
      if (!cfA.isCompletedExceptionally()) {
        account.fieldA = cfA.join();
      }
      if (!cfB.isCompletedExceptionally()) {
        account.fieldB = cfB.join();
      }
      if (!cfC.isCompletedExceptionally()) {
        account.fieldC = cfC.join();
      }
    }).join(); // or get(...) with timeout
Run Code Online (Sandbox Code Playgroud)

在一个完成阶段内更新字段的好处是这些操作是在同一个线程中完成的,因此您不必担心并发修改。