标签: completable-future

JavaEE应用服务器中的CompletableFuture/parallelStream

鉴于新的Java8,我们为异步任务获得了非常好的功能,例如CompletableFuture和.paralellStream().如果你在Java SE中运行它,因为我已经理解它你将使用ForkJoinPool,但是如果我在例如Wildfly或TomcatEE中运行以下示例会发生什么?

//Here I start a comp.Future without giving an Executor
test = CompletableFuture.supplyAsync(() -> timeConsumingMethod());
//Here I start a parallel stream 
mList.paralell().filter(...).collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)

会发生什么,我将从哪里借用我的资源

  1. 这些示例在@Stateful bean中运行
  2. 这些示例在@Stateless bean中运行
  3. 这些示例在CDI bean中运行

java asynchronous java-ee java-ee-7 completable-future

8
推荐指数
1
解决办法
1550
查看次数

如何从Async Http Client请求获取CompletableFuture <T>?

Async Http Client文档中,我看到如何获取Future<Response>异步HTTP Get请求的结果,例如:

AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
Future<Response> f = asyncHttpClient
      .prepareGet("http://api.football-data.org/v1/soccerseasons/398")
      .execute();
Response r = f.get();
Run Code Online (Sandbox Code Playgroud)

但是,为方便起见,我希望得到一个CompletableFuture<T>替代方案,我可以应用一个将结果转换为其他内容的延续,例如将响应内容从Json反序列化为Java对象(例如SoccerSeason.java).这就是我想做的事情:

AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
CompletableFuture<Response> f = asyncHttpClient
     .prepareGet("http://api.football-data.org/v1/soccerseasons/398")
     .execute();
f
     .thenApply(r -> gson.fromJson(r.getResponseBody(), SoccerSeason.class))
     .thenAccept(System.out::println);
Run Code Online (Sandbox Code Playgroud)

根据Async Http Client文档,执行此操作的唯一方法是通过AsyncCompletionHandler<T>对象和使用promise.所以我为此构建了一个辅助方法:

CompletableFuture<Response> getDataAsync(String path){
    CompletableFuture<Response> promise = new CompletableFuture<>();
    asyncHttpClient
            .prepareGet(path)
            .execute(new AsyncCompletionHandler<Response>() {
                @Override
                public Response onCompleted(Response response) throws Exception {
                    promise.complete(response);
                    return response;
                }
                @Override …
Run Code Online (Sandbox Code Playgroud)

asynchronous java-8 asynchttpclient completable-future

8
推荐指数
1
解决办法
6129
查看次数

Difference between Java8 thenCompose and thenComposeAsync

Given this piece of code:

public List<String> findPrices(String product){
    List<CompletableFuture<String>> priceFutures =
    shops.stream()
         .map(shop -> CompletableFuture.supplyAsync(
                () -> shop.getPrice(product), executor))
         .map(future -> future.thenApply(Quote::parse))
         .map(future -> future.thenCompose(quote ->
                CompletableFuture.supplyAsync(
                        () -> Discount.applyDiscount(quote), executor
                )))
         .collect(toList());

    return priceFutures.stream()
                       .map(CompletableFuture::join)
                       .collect(toList());
}
Run Code Online (Sandbox Code Playgroud)

This part of it:

.map(future -> future.thenCompose(quote ->
                CompletableFuture.supplyAsync(
                        () -> Discount.applyDiscount(quote), executor
                )))
Run Code Online (Sandbox Code Playgroud)

Could it be rewrite as:

.map(future -> 
    future.thenComposeAsync(quote -> Discount.applyDiscount(quote), executor))
Run Code Online (Sandbox Code Playgroud)

I took this code from an example of a book and says the two solutions are …

java java-8 completable-future

8
推荐指数
1
解决办法
3883
查看次数

异步填充 Java Map 并将其作为未来返回

我有一个创建成本很高的对象映射,因此我想创建对象并与应用程序中的其他进程并行填充映射。只有当主线程实际需要访问地图时,应用程序才应等待填充地图的异步任务完成。我怎样才能最优雅地做到这一点?

当前方法

目前,我能够使用CompletableFuture.runAsync(Runnable, Executor)类似于下面的示例代码中的方式异步创建地图本身中的每个单独对象,但我不确定如何构建Future/ CompletableFuture-type 机制以Map在准备好时返回自身:

public static class AsynchronousMapPopulator {

    private final Executor backgroundJobExecutor;

    public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
        this.backgroundJobExecutor = backgroundJobExecutor;
    }

    public ConcurrentMap<String, Integer> apply(final Map<String,Integer> input) {
        final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
        final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
        for (final Entry<String, Integer> entry : input.entrySet()) {
            final String className = entry.getKey();
            final Integer oldValue = entry.getValue();
            final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
                result.put(className, oldValue + 1);
            }, …
Run Code Online (Sandbox Code Playgroud)

multithreading asynchronous future java-8 completable-future

8
推荐指数
1
解决办法
4406
查看次数

CompletableFuture allof(..).join() vs CompletableFuture.join()

我目前正在使用 CompletableFuture supplyAsync() 方法将一些任务提交到公共线程池。下面是代码片段的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);
Run Code Online (Sandbox Code Playgroud)

我想知道下面的代码与上面的代码有何不同。我从下面的代码中删除了父 completableFuture,并为每个 completableFuture 添加了 join() 而不是 getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);
Run Code Online (Sandbox Code Playgroud)

我在 spring 服务中使用它,并且存在线程池耗尽的问题。任何指针都深表感谢。

java spring multithreading threadpool completable-future

8
推荐指数
1
解决办法
2万
查看次数

Java8中的CompletableFuture

我有这段代码,我想重构Java 8

List<String> menus = new ArrayList<String>();           
for (Menu menu : resto1.getMenu()) {            
    MainIngredient mainIngredient = MainIngredient.getMainIngredient(menu.getName());           
    if (mainIngredient.getIngredient().indexOf("Vegan")!=-1) {
        menus.add(menu.getName());
    }                   
}
Run Code Online (Sandbox Code Playgroud)

重构这个简单的循环之后,似乎代码太多......我是否正确使用CompletableFutures?

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<MainIngredient>> priceFutureList = resto1.getMenu().stream()
    .map(menu -> CompletableFuture.supplyAsync(
        () -> MainIngredient.getMainIngredient(menu.getName()), executorService))
    .collect(Collectors.toList());        

CompletableFuture<Void> allFuturesDone = CompletableFuture.allOf(
    priceFutureList.toArray(new CompletableFuture[priceFutureList.size()]));

CompletableFuture<List<MainIngredient>> priceListFuture =        
    allFuturesDone.thenApply(v -> priceFutureList.stream()
        .map(CompletableFuture::join)
        .collect(toList()));
Run Code Online (Sandbox Code Playgroud)

java java-8 java-stream completable-future

8
推荐指数
1
解决办法
122
查看次数

收集 CompletableFuture.allOf() 执行中抛出的异常

有以下暂存代码:

  public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Void> process1 = CompletableFuture.runAsync(() -> {
      System.out.println("Process 1 with exception");
      throw new RuntimeException("Exception 1");
    });

    CompletableFuture<Void> process2 = CompletableFuture.runAsync(() -> {
      System.out.println("Process 2 without exception");
    });

    CompletableFuture<Void> process3 = CompletableFuture.runAsync(() -> {
      System.out.println("Process 3 with exception");
      throw new RuntimeException("Exception 3");
    });

    CompletableFuture<Void> allOfProcesses = CompletableFuture.allOf(process1, process2, process3);

    allOfProcesses.get();
  }
Run Code Online (Sandbox Code Playgroud)

我正在寻找如何收集并行执行期间引发的所有异常CompletableFuture.allOf()并将其映射到列表的方法。

我知道我可以通过返回异常(CompletableFuture<Exception>)而不是通过使用抛出并收集它来做到这一点CompletableFuture::join,但我认为抛出异常方法比稍后返回并抛出它更好

java asynchronous java-threads completable-future

8
推荐指数
1
解决办法
6684
查看次数

如何在 Java 中执行非阻塞 HTTP 调用?

我有一个第三方 API,我使用 HTTP GET 请求调用它。每个请求需要几秒钟才能得到响应。

目前我正在使用 CompletableFuture,它在大小为 64 的 FixThreadPool 上执行。这会导致线程被阻塞,直到收到 GET 请求的响应,即线程在发送 GET 响应后处于空闲状态,直到收到响应。因此,我可以发送的最大并发请求数受到线程大小的限制,即此处的 64。

我可以使用什么来代替 CompletableFuture,这样我的线程就不会闲置等待响应?

java completable-future

8
推荐指数
1
解决办法
5082
查看次数

CompletableFuture 调用 uniApply 时出现 NullPointerException

我收到这个异常:

Caused by: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) ~[?:1.8.0_302]
at com.tcom.concurrent.ConcurrentUtils$3.onSuccess(ConcurrentUtils.java:140) ~[framework-20220815.38-RELEASE.jar:?]
Run Code Online (Sandbox Code Playgroud)

这种异常很少见,并且在我的系统中无法重现。

查看 CompletableFuture.java 似乎fFunction 变量为 null。

但第二行有一个空检查uniApply(),所以f不能为空。

那么为什么 JVM 声称我在函数调用行上有 NPE 呢?如果 NPE 来自调用的函数内部,我不应该在堆栈跟踪中看到它吗?


ConcurrentUtils.java 中的相关部分:

public static <I, O> CompletableFuture<O> buildCompletableFuture(final ListenableFuture<I> listenableFuture,
                                                                 final Function<I, O> responseApplier,
                                                                 final Consumer<I> onSuccessConsumer,
                                                                 final Consumer<Throwable> onFailureConsumer,
                                                                 final Supplier<O> defaultValueOnExceptionSupplier,
                                                                 final Executor callBackExecutor) {
    //create an instance of CompletableFuture
    final CompletableFuture<I> innerComplete = new CompletableFuture<I>() {
        @Override
        public …
Run Code Online (Sandbox Code Playgroud)

java asynchronous nullpointerexception java-8 completable-future

8
推荐指数
0
解决办法
3009
查看次数

用于顺序代码的 Java CompletableFuture

我的新团队正在编写一个 Java gRPC 服务,为了确保我们永远不会阻塞请求线程,我们最终将或多或少的所有方法包装在 CompletableFuture 中,即使这些端点在概念上是操作的顺序列表(无并行性)。

\n

所以代码看起来像这样(如果需要,最后可以提供 Java 示例):

\n
  methodA()\n    methodB()\n      methodD() (let say this one is a 15ms RPC call)\n      methodE()\n    methodC()\n      methodF() (let say this one is a 5ms CPU intensive work)\n      methodG()\n \n
Run Code Online (Sandbox Code Playgroud)\n

语境:

\n
    \n
  • 在实践中,应用程序要大得多,并且有更多的功能层
  • \n
  • 每个应用程序主机需要处理 1000 QPS,因此您可以想象以该速率调用 methodA
  • \n
  • 某些函数(少数)进行 RPC 调用,可能需要 5-30 毫秒 (IO)
  • \n
  • 某些功能(很少)运行 CPU 密集型工作(< 5ms)
  • \n
\n

编辑1:昨天在网上进行了更多阅读后,我明白,当且仅当我们使用真正的非阻塞HTTP和DB客户端(并且看起来JDBC不是非阻塞的)时,这种模式可以减少所需的线程总数。我的理解是,如果我们有足够的内存来为每个请求保留一个线程,那么使用同步代码仍然可能是最有效的实现(减少切换线程和加载数据的开销),但是如果我们没有足够的内存为了保持那么多线程处于活动状态,那么使整个代码成为非阻塞的概念可以减少线程数量,从而允许应用程序扩展到更多请求。

\n

问题一: \n我知道这会解锁“请求线程”,但实际上有什么好处?我们真的节省了 CPU 时间吗?在下面的示例中,感觉“某些”线程无论如何都会一直处于活动状态(在下面的示例中,主要是来自 methodD 中 CompletableFuture.supplyAsync 的线程),只是碰巧它\xe2\x80\x99s 不一样线程作为接收初始请求的线程。

\n

问题 …

java asynchronous nonblocking grpc completable-future

8
推荐指数
1
解决办法
1424
查看次数