标签: completable-future

java多嵌套CompletionStage相当于"flatMap"?

所以考虑我有以下例子:

CompletionStage<String> tokenFuture = getToken();

CompletionStage<CompletionStage<CompletionStage<CompletionStage<Boolean>>>> result = tokenFuture.thenApply(token -> {

    WSRequest request = ws.url(url).setHeader("Authorization", "Bearer " + token);

    CompletionStage<WSResponse> response = request.post(json);

    return response.thenApply(r -> {
        if (r.getStatus() == 201) {
            return CompletableFuture.supplyAsync(() -> CompletableFuture.supplyAsync(() -> true));
        } else {
            return getToken().thenApply(t -> {
                WSRequest req = ws.url(url).setHeader("Authorization", "Bearer " + t);
                return req.post(json).thenApply(b -> b.getStatus() == 201);
            });
        }
    });

});
Run Code Online (Sandbox Code Playgroud)

我的问题是整个CompletionStage<CompletionStage<CompletionStage<CompletionStage<Boolean>>>>多嵌套的未来类型.是否有可能CompletionStage<Boolean>通过使用像flatMapScala之类的东西来减少它,还是有其他方法可以做到这一点?

java future completable-future

9
推荐指数
1
解决办法
2068
查看次数

如何创建异步堆栈跟踪?

更新:Intellij IDEA的最新版本实现了我正在寻找的东西.问题是如何在IDE之外实现这一点(因此我可以将异步堆栈跟踪转储到日志文件中),理想情况下不使用检测代理.


自从我将应用程序从同步模型转换为异步模型后,我遇到了调试失败的问题.

当我使用同步API时,我总是在异常堆栈跟踪中找到我的类,所以我知道从哪里开始查找是否出错.使用异步API,我得到的堆栈跟踪不会引用我的类,也不会指示哪个请求触发了失败.

我将给你一个具体的例子,但我对这类问题的一般解决方案感兴趣.

具体例子

我使用Jersey发出HTTP请求:

new Client().target("http://test.com/").request().rx().get(JsonNode.class);
Run Code Online (Sandbox Code Playgroud)

where rx()表示请求应异步发生,CompletionStage<JsonNode>而不是JsonNode直接返回.如果此调用失败,我会得到这个堆栈跟踪:

javax.ws.rs.ForbiddenException: HTTP 403 Authentication Failed
    at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:1083)
    at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:883)
    at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:767)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:229)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:414)
    at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:765)
    at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:456)
    at org.glassfish.jersey.client.JerseyCompletionStageRxInvoker.lambda$method$1(JerseyCompletionStageRxInvoker.java:70)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
Run Code Online (Sandbox Code Playgroud)

注意:

  • 堆栈跟踪不引用用户代码.
  • 异常消息不包含有关触发错误的HTTP请求的上下文信息(HTTP方法,URI等).

因此,我无法将异常追溯到其来源.

为什么会这样

如果你在引擎盖下挖掘,你会发现泽西岛正在调用:

CompletableFuture.supplyAsync(() -> getSyncInvoker().method(name, entity, responseType))
Run Code Online (Sandbox Code Playgroud)

对于rx()调用.因为供应商是由泽西岛建造的,所以没有参考用户代码.

我试过的

我试图针对一个无关的异步示例提交针对Jetty 的错误报告,随后因安全原因被拒绝.

相反,我一直在添加上下文信息如下:

makeHttpRequest().exceptionally(e ->
{
    throw new RuntimeException(e);
});
Run Code Online (Sandbox Code Playgroud)

意思是,我 …

java debugging asynchronous completable-future

9
推荐指数
1
解决办法
846
查看次数

What are the ways to pass threadpoolexecutor to CompletableFuture?

I have been working on Java CompletableFuture lately and found , we should always use customized Threadpool. With it, I found two ways of passing threadpool to existing code. Like below

This is my ThreadPool in configuration file

@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() {
  return new ThreadPoolTaskExecutor();
}
Run Code Online (Sandbox Code Playgroud)

1. Passing existingThreadPool in argument.

 @Autowired
 @Qualifier("commonThreadPool") 
 TaskExecutor existingThreadPool;       
 CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);
Run Code Online (Sandbox Code Playgroud)

2. Using async like below

@Async("commonThreadPool")
public void executeTask() {
// Execute Some Task
}
Run Code Online (Sandbox Code Playgroud)

is there any …

java multithreading spring-boot completable-future

9
推荐指数
1
解决办法
5605
查看次数

为什么`parallelStream` 比`CompletableFuture` 实现更快?

我想在某个操作上提高后端 REST API 的性能,该操作按顺序轮询多个不同的外部 API 并收集它们的响应并将它们全部扁平化为一个响应列表。

最近刚刚了解了CompletableFutures,我决定试一试,并将该解决方案与仅将 my 更改stream为 a 的解决方案进行比较parallelStream

这是用于基准测试的代码:

    package com.alithya.platon;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;


    public class ConcurrentTest {

        static final List<String> REST_APIS =
                Arrays.asList("api1", "api2", "api3", "api4", "api5", "api6", "api7", "api8");
        MyTestUtil myTest = new MyTestUtil();
        long millisBefore; // used to benchmark

        @BeforeEach
        void setUp() {
            millisBefore = System.currentTimeMillis();
        }

        @AfterEach
        void tearDown() {
            System.out.printf("time taken …
Run Code Online (Sandbox Code Playgroud)

java benchmarking java-stream completable-future

9
推荐指数
1
解决办法
1161
查看次数

是否可以在没有锁的情况下快速解决多线程银行帐户问题?

我正在尝试解决多线程银行帐户问题*,而不使用锁,而是使用多版本并发控制。它正在工作。只是有点慢。我怎样才能加快速度?

(*) 我有 5 个用户,每个用户从 200 开始 - 每个人随机提取 100 并将 100 存入另一个用户拥有的另一个银行账户。我预计到运行结束时银行余额总计为 1000。不应损失或创造任何金钱。这部分适用于我下面的实现。

import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;

public class ConcurrentWithdrawer {

    private Map<String, Integer> database = new HashMap<>();
    private int transactionCount = 0;
    private final List<Transaction> transactions = Collections.synchronizedList(new ArrayList<>());

    public static void main(String[] args) {
        try {
            new ConcurrentWithdrawer().run();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static int getRandomNumberInRange(int min, int max) {

        if (min >= …
Run Code Online (Sandbox Code Playgroud)

java concurrency multithreading completable-future

9
推荐指数
1
解决办法
341
查看次数

如何使用 CompletableFuture 并行运行多个服务调用?

我正在向用户返回以下响应

class FinalResponseDTO {
    List<Service1ResponseDTO> service1ResponseDTO;
    Long totalCount;
    List<Service2ResponseDTO> service2ResponseDTO;
}
Run Code Online (Sandbox Code Playgroud)

截至目前,我正在进行三个连续调用来计算 this FinalResponseDTO,每个调用都可以独立于其他调用运行。我尝试制作三种不同CompletableFuture的:

CompletableFuture<List<Service1ResponseDTO> future1 = CompletableFuture.supplyAsync(() -> service1.callMethod1());
CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> service2.callMethod2());
CompletableFuture<Service2ResponseDTO> future3 = CompletableFuture.supplyAsync(() -> service3.callMethod3());
Run Code Online (Sandbox Code Playgroud)

如果我这样做CompletableFuture.allOf(future1, future2, future3).join();或我应该打电话吗CompletableFuture.allOf(future1, future2, future3).get();?即使我调用其中任何一个join,或者get我应该如何FinalResponseDTO从中构建。我对 Java 8 并发功能不熟悉,例如,CompletableFuture我很困惑,因为每个 future 的返回类型都不同,我应该如何获得所有这些 future 的组合响应,然后构造我的最终输出?

java multithreading asynchronous completable-future

9
推荐指数
1
解决办法
1万
查看次数

CompletableFuture#runAsync中的静态集合更新

前提条件(通用描述):

1.静态类字段

static List<String> ids = new ArrayList<>();
Run Code Online (Sandbox Code Playgroud)

2. CompletableFuture#runAsync(Runnable runnable,Executor executor)

static void main(String args[])方法中调用

3.step2添加到调用someCollection内部的元素runAsync

代码段(具体说明):

private static List<String> ids = new ArrayList<>();

public static void main(String[] args) throws ExecutionException, InterruptedException {
    //...
    final List<String> lines = Files.lines(path).collect(Collectors.toList());
    for (List<String> lines : CollectionUtils.split(1024, lines)) {
         CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
             List<User> users = buildUsers();
             populate(users);
         }, executorService);

        futures.add(future);
    }

    private static void populate(List<User> users){ …
Run Code Online (Sandbox Code Playgroud)

java static field updating completable-future

8
推荐指数
1
解决办法
668
查看次数

如何通过访问时间而不是创建时间来优先等待 CompletableFutures?

TL;DR:当有几个CompletableFutures 等待执行时,我如何优先考虑那些我感兴趣的值?

我有一个 10,000CompletableFuture秒的列表(计算产品数据库内部报告的数据行):

List<Product> products = ...;

List<CompletableFuture<DataRow>> dataRows = products
    .stream()
    .map(p -> CompletableFuture.supplyAsync(() -> calculateDataRowForProduct(p), singleThreadedExecutor))
    .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

每个都需要大约50 毫秒才能完成,所以整个事情在500 秒内完成。(它们都共享相同的数据库连接,因此不能并行运行)。

假设我想访问第 9000 个产品的数据行: dataRows.get(9000).join()

问题是,所有这些 CompletableFuture 都是按照它们被创建的顺序执行的,而不是按照它们被访问的顺序。这意味着我必须等待450 秒才能计算出我目前不关心的内容,最终到达我想要的数据行。

:有什么办法改变这种行为,使期货我尝试访问GET优先对那些我不关心的时刻?

第一个想法

我注意到 aThreadPoolExecutor使用 aBlockingQueue<Runnable>来排队等待可用线程的条目。

因此,我考虑使用PriorityBlockingQueue, 来更改Runnable访问它时的优先级,CompletableFuture但是:

  • PriorityBlockingQueue没有方法重新排列现有元素的优先级,并且
  • 我需要想办法从 …

java performance asynchronous priority-queue completable-future

8
推荐指数
2
解决办法
397
查看次数

CompletableFuture 调用 uniApply 时出现 NullPointerException

我收到这个异常:

Caused by: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) ~[?:1.8.0_302]
at com.tcom.concurrent.ConcurrentUtils$3.onSuccess(ConcurrentUtils.java:140) ~[framework-20220815.38-RELEASE.jar:?]
Run Code Online (Sandbox Code Playgroud)

这种异常很少见,并且在我的系统中无法重现。

查看 CompletableFuture.java 似乎fFunction 变量为 null。

但第二行有一个空检查uniApply(),所以f不能为空。

那么为什么 JVM 声称我在函数调用行上有 NPE 呢?如果 NPE 来自调用的函数内部,我不应该在堆栈跟踪中看到它吗?


ConcurrentUtils.java 中的相关部分:

public static <I, O> CompletableFuture<O> buildCompletableFuture(final ListenableFuture<I> listenableFuture,
                                                                 final Function<I, O> responseApplier,
                                                                 final Consumer<I> onSuccessConsumer,
                                                                 final Consumer<Throwable> onFailureConsumer,
                                                                 final Supplier<O> defaultValueOnExceptionSupplier,
                                                                 final Executor callBackExecutor) {
    //create an instance of CompletableFuture
    final CompletableFuture<I> innerComplete = new CompletableFuture<I>() {
        @Override
        public …
Run Code Online (Sandbox Code Playgroud)

java asynchronous nullpointerexception java-8 completable-future

8
推荐指数
0
解决办法
3009
查看次数

用于顺序代码的 Java CompletableFuture

我的新团队正在编写一个 Java gRPC 服务,为了确保我们永远不会阻塞请求线程,我们最终将或多或少的所有方法包装在 CompletableFuture 中,即使这些端点在概念上是操作的顺序列表(无并行性)。

\n

所以代码看起来像这样(如果需要,最后可以提供 Java 示例):

\n
  methodA()\n    methodB()\n      methodD() (let say this one is a 15ms RPC call)\n      methodE()\n    methodC()\n      methodF() (let say this one is a 5ms CPU intensive work)\n      methodG()\n \n
Run Code Online (Sandbox Code Playgroud)\n

语境:

\n
    \n
  • 在实践中,应用程序要大得多,并且有更多的功能层
  • \n
  • 每个应用程序主机需要处理 1000 QPS,因此您可以想象以该速率调用 methodA
  • \n
  • 某些函数(少数)进行 RPC 调用,可能需要 5-30 毫秒 (IO)
  • \n
  • 某些功能(很少)运行 CPU 密集型工作(< 5ms)
  • \n
\n

编辑1:昨天在网上进行了更多阅读后,我明白,当且仅当我们使用真正的非阻塞HTTP和DB客户端(并且看起来JDBC不是非阻塞的)时,这种模式可以减少所需的线程总数。我的理解是,如果我们有足够的内存来为每个请求保留一个线程,那么使用同步代码仍然可能是最有效的实现(减少切换线程和加载数据的开销),但是如果我们没有足够的内存为了保持那么多线程处于活动状态,那么使整个代码成为非阻塞的概念可以减少线程数量,从而允许应用程序扩展到更多请求。

\n

问题一: \n我知道这会解锁“请求线程”,但实际上有什么好处?我们真的节省了 CPU 时间吗?在下面的示例中,感觉“某些”线程无论如何都会一直处于活动状态(在下面的示例中,主要是来自 methodD 中 CompletableFuture.supplyAsync 的线程),只是碰巧它\xe2\x80\x99s 不一样线程作为接收初始请求的线程。

\n

问题 …

java asynchronous nonblocking grpc completable-future

8
推荐指数
1
解决办法
1424
查看次数