CompletableFuture.exceptionally()方法采用lambda,但该方法没有采用自定义Executor的风格,甚至没有采用“ ... Async”风格的风格。
lambda可以在哪个执行器上运行?运行原始CompletableFuture引发异常的是同一执行者吗?或者(如果是这种情况,我会感到惊讶)是commonPool吗?
构建 API 时,向接口编码是一种很好的做法,因此返回 CompletionStage 似乎是最好的方法。然而我意识到我总是在获得 CompletionStage 后调用 .toCompletableFuture 。在这种情况下,推荐的方法是什么?
我正在寻找Futures.successfulAsList()用Java 8的CompletableFuture代码替换Guava的规范代码。
我认为这CompletableFuture.allOf()似乎可以替代Futures.allAsList(),但是我看不到任何类似的东西successfulAsList()。
在Oracle 站点上阅读这篇文章https://community.oracle.com/docs/DOC-995305 后,我正在尝试实施“一些二选一选择模式”段落中描述的模式。最后一类模式也包含二对一模式。但这一次,不是执行一次下游元素,而是完成了两个上游元素,当两个上游元素之一完成时执行下游元素。例如,当我们想要解析域名时,这可能非常有用。我们可能会发现查询一组域名服务器比只查询一个域名服务器更有效。我们不希望从不同的服务器得到不同的结果,所以我们不需要比我们得到的第一个答案更多的答案。
实现一个场景很简单,我只有 2 个 CompleatableFuture,但我无法用 3 个或更多 CompleatableFuture 实现相同的场景。
我试过这个:
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));
cf1.applyToEither(
cf2, s1 -> cf2.applyToEither(
cf3, s2 -> cf3.applyToEither(
cf4, s3 -> "First result is: " + s3))).thenAccept(System.out::println).join();
Run Code Online (Sandbox Code Playgroud)
FutureMain 是我的类,这是 generateString 方法
public static String generateString(String input) {
Random r = new Random();
int millis = r.nextInt(6) * 1000;
System.out.println(input + " …Run Code Online (Sandbox Code Playgroud) 我正在用Java编写Play2应用程序服务方法,该方法应执行以下操作。异步调用方法A,如果失败,则异步调用方法B。
为了说明,假设该接口用于服务调用的后端:
public interface MyBackend {
CompletionStage<Object> tryWrite(Object foo);
CompletionStage<Object> tryCleanup(Object foo);
}
Run Code Online (Sandbox Code Playgroud)
因此,在我的服务方法中,我想返回一个可以完成以下操作的Future:
(注意:当然tryWrite()本身可以进行任何清理,这是一个简化的示例来说明问题)
在我看来,像这样调用后端的服务的实现似乎很困难,因为CompletionStage.exceptionally()方法不允许进行组合。
版本1:
public class MyServiceImpl {
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
CompletionStage<Object> writeFuture = myBackend.tryWrite(foo)
.exceptionally((throwable) -> {
CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo);
throw new RuntimeException(throwable);
});
return writeFuture;
}
}
Run Code Online (Sandbox Code Playgroud)
因此,版本1以非阻塞方式调用tryCleanup(foo),但tryWriteWithCleanup()返回的CompletionStage不会等待cleanupFuture完成。如何更改此代码以从也将等待cleanupFuture完成的服务返回将来?
版本2:
public class MyServiceImpl {
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
final AtomicReference<Throwable> saveException = new AtomicReference<>();
CompletionStage<Object> writeFuture = myBackend
.tryWrite(foo)
.exceptionally(t -> {
saveException.set(t);
// continue with cleanup
return null;
}) …Run Code Online (Sandbox Code Playgroud) java asynchronous exception-handling java-8 completable-future
我的直觉是下面的代码是错误的。我相信因为正在使用 join() ,所以在完成 future 时抛出的任何异常都将不受检查。那么当调用 get() 时,将不会有检查异常,不会记录任何错误,并且在失败期间难以诊断错误。
List<CompletableFuture> list = ImmutableList.of(future1, future2);
CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()])).join();
try {
result1 = future1.get();
result2 = future2.get();
} catch (InterruptedException | ExecutionException e) {
// will this ever run if join() is already called?
}
Run Code Online (Sandbox Code Playgroud)
我已经浏览了 CompletableFuture 的文档,但没有找到我的问题的确切答案。我在这里问,然后将阅读源代码。
我可以看到 catch 块代码将运行的唯一原因是,如果以某种方式可以将已检查的异常保存在某些执行上下文中,并且不会在 join() 中抛出(或由未经检查的异常包装抛出),然后以某种形式再次抛出得到()。这对我来说似乎不太可能。
所以我的最终问题是,catch 块代码会运行吗?
我想创建一个CompletableFuture已经完成的异常.
Scala通过工厂方法提供我正在寻找的东西:
Future.failed(new RuntimeException("No Future!"))
Run Code Online (Sandbox Code Playgroud)
在Java 10或更高版本中是否有类似的东西?
我正在阅读java 8的行动,第11章(关于CompletableFutures),它让我思考我公司的代码库.
动作书中的java 8说如果你有像我下面写的代码,你一次只能使用4 CompletableFuture秒(如果你有4核计算机).这意味着如果你想异步执行10个操作,你将首先运行前4 CompletableFuture秒,然后是第2个4,然后是剩下的2个,因为默认ForkJoinPool.commonPool()只提供等于的线程数Runtime.getRuntime().availableProcessors().
在我公司的代码库中,有一个@Service名为AsyncHelpers的类,它们包含一个方法load(),该方法使用CompletableFutures在不同的块中异步加载有关产品的信息.我想知道他们一次只使用4个线程.
我公司的代码库中有几个这样的异步助手,例如,一个用于产品列表页面(PLP),另一个用于产品详细信息页面(PDP).产品详细信息页面是专门针对特定产品的页面,显示其详细特征,交叉销售产品,类似产品和更多内容.
有一个架构决定以块的形式加载pdp页面的细节.加载应该是异步发生的,当前代码使用CompletableFutures.我们来看看伪代码:
static PdpDto load(String productId) {
CompletableFuture<Details> photoFuture =
CompletableFuture.supplyAsync(() -> loadPhotoDetails(productId));
CompletableFuture<Details> characteristicsFuture =
CompletableFuture.supplyAsync(() -> loadCharacteristics(productId));
CompletableFuture<Details> variations =
CompletableFuture.supplyAsync(() -> loadVariations(productId));
// ... many more futures
try {
return new PdpDto( // construct Dto that will combine all Details objects into one
photoFuture.get(),
characteristicsFuture.get(),
variations.get(),
// .. many more future.get()s
); …Run Code Online (Sandbox Code Playgroud) 我想链接一个CompletableFuture,以便它在处理过程中呈扇形散开。我的意思是我对列表有一个开放的CompletableFuture,并且我想对列表中的每个项目应用计算。
第一步是调用m_myApi.getResponse(request,executor)发出异步调用。
该异步调用的结果具有getCandidates方法。我想同时解析所有这些候选人。
目前,我的代码按顺序解析它们
public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
return candidates.thenApplyAsync(response -> response.getCandidates()
.stream()
.map(MyParser::ParseCandidates)
.collect(Collectors.toList()));
}
Run Code Online (Sandbox Code Playgroud)
我想要这样的东西:
public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
return candidates.thenApplyAsync(response -> response.getCandidates()
.stream()
.PARSE_IN_PARALLEL_USING_EXECUTOR
}
Run Code Online (Sandbox Code Playgroud) 不传递Executorto CompletableFuture.runAsync(),ForkJoinPool则使用公共。相反,对于我想要异步执行的简单任务(例如,我不需要链接不同的任务),我可能只使用ForkJoinPool.commonPool().execute().
为什么应该优先考虑另一个?例如,是否runAsync()有任何实质性的开销execute()?前者比后者有什么特别的优势吗?
java multithreading asynchronous forkjoinpool completable-future
java ×7
java-8 ×7
asynchronous ×5
api ×1
exception ×1
forkjoinpool ×1
future ×1
guava ×1
java-10 ×1
nonblocking ×1