我有一个异步执行的查询输入流。我想确保当我使用时Completablefuture::join,这些要求的结果是按照输入查询流的顺序收集的。
这是我的代码的样子:
queries.stream()
.map(query -> CompletableFuture.supplyAsync(() -> {
try {
return SQLQueryEngine.execute(query);
} catch (InternalErrorException e) {
throw new RuntimeException(e);
}
}))
.map(CompletableFuture::join)
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
SQLQueryEngine.execute(查询); 返回一个List<Results>so 输出是List<List<Result>。我想展平并将所有结果合并到一个列表中。如果我在收集之前使用 .flatMap(List::stream) 来扁平化,它会保持排序吗?
从请求范围的线程中,CompletableFuture必须由执行器中运行的任务来完成。所提供的供应商使用会话范围内的特定于域的服务MessageService。该服务是由 Guice 注入的。
public class MessageProcessingPage {
private MessageService messageService;
@Inject
public MessagProcessingPage (MessageService messageService) {
this.messageService = messageService;
}
// Called by request scoped thread.
public void onProcessMessagesButton () {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(
// Called from a thread from the threadpool.
() -> {return messageService.retrieveMessageMetadataSet(x, y);}
, executorService);
...
}
...
}
Run Code Online (Sandbox Code Playgroud)
有一个被注入的MessageService(会话范围) 。MessageRestClient
@SessionScoped
public class MessageService {
private MessageRestClient messageRestClient;
@Inject
public MessageRestClient (MessageRestClient messageRestClient) {
this.messageRestClient = …Run Code Online (Sandbox Code Playgroud) 我有以下组成部分:
private JobInfo aggregateJobInfo() {
final JobsResult jobsResult = restClient().getJobs();
final List<String> jobIds = extractJobIds(jobsResult);
//fetch details, exceptions and config for each job
final List<JobDetails> jobDetails = jobIds.stream().map(jobId -> {
final JobDetailResult jobDetailResult = restClient().getJobDetails(jobId);
final JobExceptionsResult jobExceptionsResult = restClient().getJobExceptions(jobId);
final JobConfigResult jobConfigResult = restClient().getJobConfig(jobId);
return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult);
}).collect(Collectors.toList());
return new JobInfo(jobsResult, jobDetails);
}
private static List<String> extractJobIds(final JobsResult jobsResult) {
final ArrayList<String> jobIds = new ArrayList<>();
jobIds.addAll(jobsResult.getRunning());
jobIds.addAll(jobsResult.getFinished());
jobIds.addAll(jobsResult.getCanceled());
jobIds.addAll(jobsResult.getFailed());
return jobIds;
}
Run Code Online (Sandbox Code Playgroud)
它只是调用一些ENDPOINTS并修饰一些数据。现在,我试图通过使用CompletableFutures来实现这种非阻塞性,而我之前从未真正使用过。
private CompletableFuture<JobInfo> aggregateJobInfo() …Run Code Online (Sandbox Code Playgroud) 我有以下代码(或多或少):
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture
.supplyAsync(()->{
return longRunningMethodThatReturnsBoolean();
}, executor)
.thenAcceptAsync(taskResult -> {
logResult();
executor.shutdown();
}, executor);
Run Code Online (Sandbox Code Playgroud)
这允许主线程中的代码继续,但是我希望主线程在完成时死亡,并且未来继续在它自己的线程中工作,但是主线程保持活动状态直到 CompletableFuture 完成,即使主线程没有不再做任何事情。
我对这个有点陌生,我错过了什么吗?甚至有可能吗?
任何帮助将不胜感激!!!
我有一个库 xyz,它为我提供了一个 CompletableFuture,我想在我的 Vertx (v3.5) 事件循环中处理它。目前我正在使用CompletableFuture.handle(BiFunction)(见下文),但我想使用CompletableFuture.handleAsync(BiFunction, Executor),但我无法弄清楚如何在此方法调用中向第二个参数提供 vertx 事件循环线程.
我尝试在Vertx.runOnContext() 中执行整个代码,但 handleAsync 内部的调用仍然在 Java 的 ForkJoin 池上执行,我想避免这种情况。
CompletableFuture<Void> f = xyz.someMethod();
f.handle((v, th) -> { //Want to run this in handleAsync()
if (th == null) {
future.complete(null);
} else {
future.completeExceptionally(th);
}
return null;
});
Run Code Online (Sandbox Code Playgroud) 此代码引自Java 8 in Action,也出现在本书11.4.3中.
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
Run Code Online (Sandbox Code Playgroud)
沿着代码,编写器附上如下图,表示applyDiscount()同一个线程中的工作getPrice(),我强烈怀疑:这里有两个不同的Async后缀,这意味着第二个调用应该在另一个线程中.
我使用以下代码在本地测试它:
private static void testBasic() {
out.println("*****************************************");
out.println("********** TESTING thenCompose **********");
CompletableFuture[] futures = IntStream.rangeClosed(0, LEN).boxed()
.map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), EXECUTOR_SERVICE))
.map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> runStage2(i), EXECUTOR_SERVICE)))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
}
Run Code Online (Sandbox Code Playgroud)
输出进一步证明了我的想法,这是正确的吗?
***************************************** …Run Code Online (Sandbox Code Playgroud) 我将使用 CompletableFuture 进行异步调用。我想知道是否有办法延迟其执行。我对一个流有大约 5 个异步调用,并且我正在链接,然后根据需要使用 thenApply/thenCompose。我的问题是当我为调用创建第一个 CF 时,只要线程空闲,它就会开始执行。我希望首先我的所有任务都应该被链接起来,然后当我调用,说完成()时,它开始执行。我正在寻找的是类似于 Java 流中的中间操作。在我之前关于 SO 的一个问题中,我对此有一些帮助,但这并没有解决我的目的。
我的技术栈只允许 Java 8,所以不能使用下一个版本中推出的任何功能。
当所有给定的期货完成时,Java 8CompletableFuture.allOf(CompletableFuture<?>...cfs)将返回一个CompletableFuture完成的,或者CompletionException如果其中一个期货完成并出现异常则抛出一个。
如果我的一个期货异常完成,会CompletableFuture.allOf等待剩余的期货完成后再抛出CompletionException还是会取消剩余的期货?
如果它等待所有期货完成,有什么办法让它在任何期货抛出异常时立即返回并取消剩余的期货?
我有一个相当普遍或独特的要求。例如,我有以下AccountDetails列表:
List<AccountDetails>
class AccountDetails {
String bankAccountId;
String mortgageAccountId;
Integer noOfTrans;
String addressLine;
String externalLink;
}
Run Code Online (Sandbox Code Playgroud)
上述所有字段,除了bankAccountId从外部 REST 服务调用中提取的。我想并行调用所有 REST 服务并更新列表中的每个对象:
所以,它看起来像下面:
对于每个 accountDetails
martgageAccountId字段(REST 返回 MortgageInfo 对象)noOfTrans字段(REST 返回Transactions对象)addressLine字段(REST 返回Address对象)externalLink字段。(REST 返回Links对象)我希望并行执行上述所有调用,并且针对AcccountDetails列表中的每个对象。如果有异常,我想优雅地处理它。请注意,上述每个 REST 服务都返回不同的自定义对象
我对如何通过CompletableFuture链接实现这一点感到困惑。不确定allOf或thenCombine(只需要两个),或者thenCompose应该使用以及如何将所有这些放在一起。
任何例子/想法?
我正在学习 CompletableFuture API,有一个例子:
CompletableFuture.completedFuture(url)
.thenComposeAsync(this::readPage, executor)
.thenApply(this::getImageURLs)
.thenApply(this::saveFoundImages)
.....
Run Code Online (Sandbox Code Playgroud)
我有一个问题:如果我将thenComposeAsync(...)方法作为第一个调用,链中的其他方法会在executor我通过参数传递的方法中执行,还是应该使用async调用其他方法以在特定执行程序中异步执行?
java concurrency multithreading asynchronous completable-future
java ×9
java-8 ×6
asynchronous ×3
java-stream ×2
concurrency ×1
guice ×1
spring-boot ×1
vert.x ×1