我的应用程序中有 3 种不同的方法。都回来了CompletableFuture<SomeType>。我想并行执行方法 1 和方法 2。完成方法 1 和方法 2 后,我想使用方法 1 和方法 2 返回值的参数触发方法 3。
代码示例:
CompletableFuture<Request> future1 = RequestConverter.Convert(requestDto);
CompletableFuture<String> future2 = tokenProvider.getAuthToken();
CompletableFuture<CompletableFuture<String>> future3 =
future1.thenCombine(future2,(request,token) ->
requestProcessor.Process(request,token));
Run Code Online (Sandbox Code Playgroud)
但是,与上面的代码的问题是,我得到一个CompletableFuture的CompletableFuture。我想避免这种情况并获得简单CompletableFuture<String>而不阻塞。这可能吗?
java parallel-processing nonblocking chaining completable-future
用例:
结果:异常永远不会被捕获,并且没有跟踪/记录它。在异步系统的情况下,这是 1) 不可取的和 2) 要发现的硬和隐藏问题(例如 NPE、Runtime Exc 等)的指标。
问题:与java.lang类比/以类似方式实现CompletableFuture.UncaughtExceptionHandler机制是否可行。Thread.UncaughtExceptionHandler?如果 CompletableFuture 链没有附加 java.util.concurrent.CompletableFuture.UniExceptionally Completion,则提供 [默认] 未捕获的异常处理程序/完成。
问题:如何直接从 抛出自定义异常.exceptionally()?
List<CompletableFuture<Object>> futures =
tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> businessLogic(task))
.exceptionally(ex -> {
if (ex instanceof BusinessException) return null;
//TODO how to throw a custom exception here??
throw new BadRequestException("at least one async task had an exception");
}))
.collect(Collectors.toList());
try {
List<Object> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
} catch (CompletionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause());
}
Run Code Online (Sandbox Code Playgroud)
问题:我总是得到一个CompletionExceptionwho ex.getCause()is instanceof BadRequestException。
这可能吗?
有没有办法让 mockito 在CompletableFuture.get()调用时抛出异常而不仅仅是异步方法?
例如,给定以下(不正确的)测试用例:
@Test
public void whenRunnerThrows_thenReturn5xx() throws Exception {
when(service.runAsync(any(),any())).thenThrow(new Exception(""));
mvc.perform(post("/test")
.contentType(MediaType.APPLICATION_JSON)
.content("{\"name\":\"test\"}"))
.andExpect(status().is5xxServerError());
}
Run Code Online (Sandbox Code Playgroud)
service.runAsync()在测试期间调用when ,抛出异常,这是有道理的。但是当(Spring Boot)应用程序运行时,同样的异常只会作为ExecutionException返回的原因抛出CompletableFuture::get.
编写这样的测试以便在运行应用程序时在单元测试中同时抛出异常的正确方法是什么?
我有一个遗留代码,它有十几个数据库调用来填充报告,我试图减少使用CompletableFuture.
我怀疑我是否正确地做事并且没有过度使用这项技术。
我的代码现在看起来像这样:
在每个方法中使用许多数据库调用启动文档部分的异步填充
CompletableFuture section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture section1oFuture = CompletableFuture.supplyAsync(() -> populateSection10(arguments));
Run Code Online (Sandbox Code Playgroud)然后我按特定顺序安排期货arrayList并加入所有期货以确保我的代码只有在所有期货都完成后才能进一步运行。
List<CompletableFuture> futures = Arrays.asList(
section1Future,
section2Future, ...
section10Future);
List<Object> futureResults = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)然后我用它的碎片填充 PDF 文档本身
Optional.ofNullable((PdfPTable) futureResults.get(0)).ifPresent(el -> populatePdfElement(document, el));
Optional.ofNullable((PdfPTable) futureResults.get(1)).ifPresent(el -> populatePdfElement(document, el));
...
Optional.ofNullable((PdfPTable) futureResults.get(10)).ifPresent(el -> populatePdfElement(document, el));
Run Code Online (Sandbox Code Playgroud)
退回文件
我的担忧是:
1) 以这种方式创建和实例化许多 Completable Futures 是否可以?将它们按要求的顺序排序arrayList,加入它们以确保它们全部完成,然后通过将它们转换为特定对象来获得结果?
2) 可以不指定执行器服务而运行,而是依赖 commonForkJoinPool吗?但是,此代码在 Web 容器中运行,所以可能为了使用 JTA,我需要通过 JNDI 使用容器提供的线程池执行程序? …
我有一个 for 循环,我正在尝试使用 CompletableFuture 对其进行并行化。
for (int i = 0; i < 10000; i++) {
doSomething();
doSomethingElse();
}
Run Code Online (Sandbox Code Playgroud)
到目前为止我所拥有的是:
for (int i = 0; i < 10000; i++) {
CompletableFuture.runAsync(() -> doSomething());
CompletableFuture.runAsync(() -> doSomethingElse());
}
Run Code Online (Sandbox Code Playgroud)
我想这可以达到目的,但是需要在所有处理开始和结束之前打印日志。如果我这样做:
log("Started doing things");
for (int i = 0; i < 10000; i++) {
CompletableFuture.runAsync(() -> doSomething());
CompletableFuture.runAsync(() -> doSomethingElse());
}
log("Ended doing things");
Run Code Online (Sandbox Code Playgroud)
这是否保证在所有 for 循环结束后将打印第二条日志语句,因为它是在单独的线程中执行的?如果没有,有没有办法在不阻塞主线程的情况下做到这一点?
java multithreading future executorservice completable-future
我有CompletableFuture哪个可以返回结果或异常。我想在出现异常和正常结果的情况下执行一些通用代码。类似于try catch finally块
当前实施
CompletableFuture<Integer> future= CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.exceptionally(ex -> {
//My Exception Handling logic
return 0;
});
Run Code Online (Sandbox Code Playgroud)
我可以把我的最终逻辑放在哪里?
我有三个功能updateFieldFromCollection1(),insertFromColletion1ToCollection2()和deleteFromCollection1()现在。
这些调用并不相互依赖,但是当我想在completableFuture 中链接它们时,它们必须按照给定的顺序一个接一个地运行。
update-> insert -> delete
Run Code Online (Sandbox Code Playgroud)
调用不返回任何内容,因此我使用的是runAsync和thenRun的completableFuture方法。并相应地将它们链接起来。我正在迭代 msgIds,它是一个字符串列表。
msgIds.stream().forEach(msgId -> CompletableFuture.runAsync(() ->
{updateFieldFromCollection1()}).thenRun(() ->
{insertFromColletion1ToCollection2()}).thenRun(() ->
{deleteFromCollection1()}));
Run Code Online (Sandbox Code Playgroud)
上面的代码有效(更新-> 插入-> 删除已完成)但会抛出像duplicateKey 和bulkwrite 这样的异常。我确信问题出现是因为在前一个线程完成其任务之前另一个线程正在启动。我希望它们异步运行,但我想通过避免冲突来规范线程。
我不确定我需要在哪里调整代码。
我正在尝试将 CompletableFuture 与在 WildFly 上运行的 EBJ bean 一起使用。
我可以看到调试器到达远程 Ejb 并成功检索结果,但在 Wayback to caller 类中我遇到了异常
java.lang.ClassNotFoundException:没有可用的类加载器
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
dnsRecords = ipaFacadeService.fetchDnsRecords();
return dnsRecords;
}).thenAccept(result -> {
if (result.size() > 0) {
//do more stuff with the result
}
});
future.get();
Run Code Online (Sandbox Code Playgroud)
当我堆栈跟踪异常时,我发现它与CompletableFuture.supplyAsync()使用的 ForkJoin 有关
[org.jboss.ejb.client.EJBInvocationHandler.doInvoke(EJBInvocationHandler.java:238),
org.jboss.ejb.client.EJBInvocationHandler.doInvoke(EJBInvocationHandler.java:183),
org.jboss.ejb.client.EJBInvocationHandler.invoke(EJBInvocationHandler.java:146),
com.sun.proxy.$Proxy107.fetchDnsRecords(Unknown Source), no.lyse.tele.prov.struts2.action.network.DnsAction.lambda$list$1(DnsAction.java:150),
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590),
java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582),
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289),
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056),
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692),
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)]
Run Code Online (Sandbox Code Playgroud) 我需要执行一些任务。有些任务是独立的,有些则依赖于其他任务的成功执行。独立的任务可以并行运行以获得更好的性能。我称这些任务为服务。该列link说明哪些服务将串行执行,哪些服务将并行执行。专栏order描述了一组定义的服务将遵循的执行顺序。对于下面的示例,服务 A 和 B 应该并行运行。如果它们已成功执行,则服务 C 将执行。请注意,服务 C 不直接依赖于其先前服务的输出,但它必须在成功执行其先前服务之后运行,因为服务 C 在其执行期间需要一些由其先前服务产生的数据。服务 C 成功执行后,下一个服务 D 将执行,如此循环下去,直到列表中的所有服务都被消费完。
Tasks service link order
Service A 01 03 1
Service B 02 03 2
Service C 03 04 3
Service D 04 05 4
Service E 05 07 5
Service F 06 07 6
Service G 07 (null) 7
Run Code Online (Sandbox Code Playgroud)
以下是我的代码。
public void executeTransactionFlow(DataVo dataVo) throws Exception {
List<Callable<Boolean>> threadList = new ArrayList<>();
List<String> serviceIds = new ArrayList<>();
List<Future<Boolean>> futureList;
String validatedRespCode …Run Code Online (Sandbox Code Playgroud) java multithreading reactive-programming java-8 completable-future
java ×8
java-8 ×4
asynchronous ×2
spring-boot ×2
chaining ×1
ejb-3.0 ×1
future ×1
jakarta-ee ×1
java-stream ×1
mockito ×1
nonblocking ×1
spring ×1
wildfly ×1