所以考虑我有以下例子:
CompletionStage<String> tokenFuture = getToken();
CompletionStage<CompletionStage<CompletionStage<CompletionStage<Boolean>>>> result = tokenFuture.thenApply(token -> {
WSRequest request = ws.url(url).setHeader("Authorization", "Bearer " + token);
CompletionStage<WSResponse> response = request.post(json);
return response.thenApply(r -> {
if (r.getStatus() == 201) {
return CompletableFuture.supplyAsync(() -> CompletableFuture.supplyAsync(() -> true));
} else {
return getToken().thenApply(t -> {
WSRequest req = ws.url(url).setHeader("Authorization", "Bearer " + t);
return req.post(json).thenApply(b -> b.getStatus() == 201);
});
}
});
});
Run Code Online (Sandbox Code Playgroud)
我的问题是整个CompletionStage<CompletionStage<CompletionStage<CompletionStage<Boolean>>>>多嵌套的未来类型.是否有可能CompletionStage<Boolean>通过使用像flatMapScala之类的东西来减少它,还是有其他方法可以做到这一点?
更新:Intellij IDEA的最新版本实现了我正在寻找的东西.问题是如何在IDE之外实现这一点(因此我可以将异步堆栈跟踪转储到日志文件中),理想情况下不使用检测代理.
自从我将应用程序从同步模型转换为异步模型后,我遇到了调试失败的问题.
当我使用同步API时,我总是在异常堆栈跟踪中找到我的类,所以我知道从哪里开始查找是否出错.使用异步API,我得到的堆栈跟踪不会引用我的类,也不会指示哪个请求触发了失败.
我将给你一个具体的例子,但我对这类问题的一般解决方案感兴趣.
我使用Jersey发出HTTP请求:
new Client().target("http://test.com/").request().rx().get(JsonNode.class);
Run Code Online (Sandbox Code Playgroud)
where rx()表示请求应异步发生,CompletionStage<JsonNode>而不是JsonNode直接返回.如果此调用失败,我会得到这个堆栈跟踪:
javax.ws.rs.ForbiddenException: HTTP 403 Authentication Failed
at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:1083)
at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:883)
at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:767)
at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
at org.glassfish.jersey.internal.Errors.process(Errors.java:229)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:414)
at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:765)
at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:456)
at org.glassfish.jersey.client.JerseyCompletionStageRxInvoker.lambda$method$1(JerseyCompletionStageRxInvoker.java:70)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
Run Code Online (Sandbox Code Playgroud)
注意:
因此,我无法将异常追溯到其来源.
如果你在引擎盖下挖掘,你会发现泽西岛正在调用:
CompletableFuture.supplyAsync(() -> getSyncInvoker().method(name, entity, responseType))
Run Code Online (Sandbox Code Playgroud)
对于rx()调用.因为供应商是由泽西岛建造的,所以没有参考用户代码.
我试图针对一个无关的异步示例提交针对Jetty 的错误报告,随后因安全原因被拒绝.
相反,我一直在添加上下文信息如下:
makeHttpRequest().exceptionally(e ->
{
throw new RuntimeException(e);
});
Run Code Online (Sandbox Code Playgroud)
意思是,我 …
I have been working on Java CompletableFuture lately and found , we should always use customized Threadpool. With it, I found two ways of passing threadpool to existing code. Like below
This is my ThreadPool in configuration file
@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() {
return new ThreadPoolTaskExecutor();
}
Run Code Online (Sandbox Code Playgroud)
1. Passing existingThreadPool in argument.
@Autowired
@Qualifier("commonThreadPool")
TaskExecutor existingThreadPool;
CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);
Run Code Online (Sandbox Code Playgroud)
2. Using async like below
@Async("commonThreadPool")
public void executeTask() {
// Execute Some Task
}
Run Code Online (Sandbox Code Playgroud)
is there any …
我想在某个操作上提高后端 REST API 的性能,该操作按顺序轮询多个不同的外部 API 并收集它们的响应并将它们全部扁平化为一个响应列表。
最近刚刚了解了CompletableFutures,我决定试一试,并将该解决方案与仅将 my 更改stream为 a 的解决方案进行比较parallelStream。
这是用于基准测试的代码:
package com.alithya.platon;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class ConcurrentTest {
static final List<String> REST_APIS =
Arrays.asList("api1", "api2", "api3", "api4", "api5", "api6", "api7", "api8");
MyTestUtil myTest = new MyTestUtil();
long millisBefore; // used to benchmark
@BeforeEach
void setUp() {
millisBefore = System.currentTimeMillis();
}
@AfterEach
void tearDown() {
System.out.printf("time taken …Run Code Online (Sandbox Code Playgroud) 我正在尝试解决多线程银行帐户问题*,而不使用锁,而是使用多版本并发控制。它正在工作。只是有点慢。我怎样才能加快速度?
(*) 我有 5 个用户,每个用户从 200 开始 - 每个人随机提取 100 并将 100 存入另一个用户拥有的另一个银行账户。我预计到运行结束时银行余额总计为 1000。不应损失或创造任何金钱。这部分适用于我下面的实现。
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
public class ConcurrentWithdrawer {
private Map<String, Integer> database = new HashMap<>();
private int transactionCount = 0;
private final List<Transaction> transactions = Collections.synchronizedList(new ArrayList<>());
public static void main(String[] args) {
try {
new ConcurrentWithdrawer().run();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static int getRandomNumberInRange(int min, int max) {
if (min >= …Run Code Online (Sandbox Code Playgroud) 我正在向用户返回以下响应
class FinalResponseDTO {
List<Service1ResponseDTO> service1ResponseDTO;
Long totalCount;
List<Service2ResponseDTO> service2ResponseDTO;
}
Run Code Online (Sandbox Code Playgroud)
截至目前,我正在进行三个连续调用来计算 this FinalResponseDTO,每个调用都可以独立于其他调用运行。我尝试制作三种不同CompletableFuture的:
CompletableFuture<List<Service1ResponseDTO> future1 = CompletableFuture.supplyAsync(() -> service1.callMethod1());
CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> service2.callMethod2());
CompletableFuture<Service2ResponseDTO> future3 = CompletableFuture.supplyAsync(() -> service3.callMethod3());
Run Code Online (Sandbox Code Playgroud)
如果我这样做CompletableFuture.allOf(future1, future2, future3).join();或我应该打电话吗CompletableFuture.allOf(future1, future2, future3).get();?即使我调用其中任何一个join,或者get我应该如何FinalResponseDTO从中构建。我对 Java 8 并发功能不熟悉,例如,CompletableFuture我很困惑,因为每个 future 的返回类型都不同,我应该如何获得所有这些 future 的组合响应,然后构造我的最终输出?
前提条件(通用描述):
1.静态类字段
static List<String> ids = new ArrayList<>();
Run Code Online (Sandbox Code Playgroud)
2. CompletableFuture#runAsync(Runnable runnable,Executor executor)
在static void main(String args[])方法中调用
3.从step2添加到调用someCollection内部的元素runAsync
代码段(具体说明):
private static List<String> ids = new ArrayList<>();
public static void main(String[] args) throws ExecutionException, InterruptedException {
//...
final List<String> lines = Files.lines(path).collect(Collectors.toList());
for (List<String> lines : CollectionUtils.split(1024, lines)) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
List<User> users = buildUsers();
populate(users);
}, executorService);
futures.add(future);
}
private static void populate(List<User> users){ …Run Code Online (Sandbox Code Playgroud) TL;DR:当有几个CompletableFutures 等待执行时,我如何优先考虑那些我感兴趣的值?
我有一个 10,000CompletableFuture秒的列表(计算产品数据库内部报告的数据行):
List<Product> products = ...;
List<CompletableFuture<DataRow>> dataRows = products
.stream()
.map(p -> CompletableFuture.supplyAsync(() -> calculateDataRowForProduct(p), singleThreadedExecutor))
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
每个都需要大约50 毫秒才能完成,所以整个事情在500 秒内完成。(它们都共享相同的数据库连接,因此不能并行运行)。
假设我想访问第 9000 个产品的数据行:
dataRows.get(9000).join()
问题是,所有这些 CompletableFuture 都是按照它们被创建的顺序执行的,而不是按照它们被访问的顺序。这意味着我必须等待450 秒才能计算出我目前不关心的内容,最终到达我想要的数据行。
问:有什么办法改变这种行为,使期货我尝试访问GET优先对那些我不关心的时刻?
第一个想法:
我注意到 aThreadPoolExecutor使用 aBlockingQueue<Runnable>来排队等待可用线程的条目。
因此,我考虑使用PriorityBlockingQueue, 来更改Runnable访问它时的优先级,CompletableFuture但是:
PriorityBlockingQueue没有方法重新排列现有元素的优先级,并且java performance asynchronous priority-queue completable-future
我收到这个异常:
Caused by: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) ~[?:1.8.0_302]
at com.tcom.concurrent.ConcurrentUtils$3.onSuccess(ConcurrentUtils.java:140) ~[framework-20220815.38-RELEASE.jar:?]
Run Code Online (Sandbox Code Playgroud)
这种异常很少见,并且在我的系统中无法重现。
查看 CompletableFuture.java 似乎fFunction 变量为 null。
但第二行有一个空检查uniApply(),所以f不能为空。
那么为什么 JVM 声称我在函数调用行上有 NPE 呢?如果 NPE 来自调用的函数内部,我不应该在堆栈跟踪中看到它吗?
public static <I, O> CompletableFuture<O> buildCompletableFuture(final ListenableFuture<I> listenableFuture,
final Function<I, O> responseApplier,
final Consumer<I> onSuccessConsumer,
final Consumer<Throwable> onFailureConsumer,
final Supplier<O> defaultValueOnExceptionSupplier,
final Executor callBackExecutor) {
//create an instance of CompletableFuture
final CompletableFuture<I> innerComplete = new CompletableFuture<I>() {
@Override
public …Run Code Online (Sandbox Code Playgroud) java asynchronous nullpointerexception java-8 completable-future
我的新团队正在编写一个 Java gRPC 服务,为了确保我们永远不会阻塞请求线程,我们最终将或多或少的所有方法包装在 CompletableFuture 中,即使这些端点在概念上是操作的顺序列表(无并行性)。
\n所以代码看起来像这样(如果需要,最后可以提供 Java 示例):
\n methodA()\n methodB()\n methodD() (let say this one is a 15ms RPC call)\n methodE()\n methodC()\n methodF() (let say this one is a 5ms CPU intensive work)\n methodG()\n \nRun Code Online (Sandbox Code Playgroud)\n语境:
\n编辑1:昨天在网上进行了更多阅读后,我明白,当且仅当我们使用真正的非阻塞HTTP和DB客户端(并且看起来JDBC不是非阻塞的)时,这种模式可以减少所需的线程总数。我的理解是,如果我们有足够的内存来为每个请求保留一个线程,那么使用同步代码仍然可能是最有效的实现(减少切换线程和加载数据的开销),但是如果我们没有足够的内存为了保持那么多线程处于活动状态,那么使整个代码成为非阻塞的概念可以减少线程数量,从而允许应用程序扩展到更多请求。
\n问题一: \n我知道这会解锁“请求线程”,但实际上有什么好处?我们真的节省了 CPU 时间吗?在下面的示例中,感觉“某些”线程无论如何都会一直处于活动状态(在下面的示例中,主要是来自 methodD 中 CompletableFuture.supplyAsync 的线程),只是碰巧它\xe2\x80\x99s 不一样线程作为接收初始请求的线程。
\n问题 …
java ×10
asynchronous ×5
benchmarking ×1
concurrency ×1
debugging ×1
field ×1
future ×1
grpc ×1
java-8 ×1
java-stream ×1
nonblocking ×1
performance ×1
spring-boot ×1
static ×1
updating ×1