我试图将方法的调用/结果链接到下一个调用.我得到编译时错误方法E因为如果无法从前一次调用获得objB的引用.
如何将上次调用的结果传递给下一个链?我完全误解了这个过程吗?
Object objC = CompletableFuture.supplyAsync(() -> service.methodA(obj, width, height))
.thenApply(objA -> {
try {
return service.methodB(objA);
} catch (Exception e) {
throw new CompletionException(e);
}
})
.thenApply(objA -> service.methodC(objA))
.thenApply(objA -> {
try {
return service.methodD(objA); // this returns new objB how do I get it and pass to next chaining call
} catch (Exception e) {
throw new CompletionException(e);
}
})
.thenApply((objA, objB) -> {
return service.methodE(objA, objB); // compilation error
})
.get();
Run Code Online (Sandbox Code Playgroud) 我正在使用Async Http Client 库(使用 Netty)向 RESTful API 发出异步 Http Get 请求。由于我想保留非阻塞行为,因此我将CompletableFuture<T>作为 Http Get 请求的结果返回的实例。因此,在 RESTful API 端点返回一个 Json 数组的地方,我返回一个CompletableFuture<T[]>.
然而,根据 Erik Meijer 对编程中的四种基本效果所做的分类,我认为这Stream<T>更适合于发出异步 Http Get 请求并返回 Json 数组的 Java 方法的结果。在这种情况下,我们可以将Stream<T>视为Observable<T>等价物,它是返回许多值的异步计算的结果。
所以,考虑到这resp持有响应,那么我可以得到CompletableFuture<Stream<T>>如下:
CompletableFuture<T[]> resp = …
return resp.thenApply(Arrays::stream);
Run Code Online (Sandbox Code Playgroud)
但是,我想知道如何将 the 转换CompletableFuture<Stream<T>> resp为 a Stream<T>,而无需等待计算完成(即我不想在get()调用时阻塞)?
我希望得到与以下表达式相同的结果,但不阻塞get():
return resp.thenApply(Arrays::stream).get();
Run Code Online (Sandbox Code Playgroud) 我正在使用CompletableFuture来异步执行从列表源生成的流.
所以我正在测试重载方法,即CompletableFuture的"supplyAsync",其中一个方法只接受单个供应商参数,另一个方法接受供应商参数和执行者参数.以下是两者的文档:
一
supplyAsync(供应商供应商)
返回由ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.
第二
supplyAsync(供应商供应商,执行执行人)
返回由给定执行程序中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.
这是我的测试类:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); …Run Code Online (Sandbox Code Playgroud) executorservice java-8 threadpoolexecutor forkjoinpool completable-future
假设我们有两个执行者,1 和 2。
我们可以配置执行时使用哪个执行器
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()-> {return 1;}, executor1) //executor1
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()-> {return 2;}, executor1) //executor1
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(()-> {return 3;}, executor2) //executor2
Run Code Online (Sandbox Code Playgroud)
但是哪个线程执行器使用 CompletableFuture 静态方法 allOf?
CompletableFuture.allOf(cf1, cf2, cf3)
Run Code Online (Sandbox Code Playgroud)
谢谢!
我试图了解CompletableFutureJava 8 的工作原理。下面的代码按预期工作
CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync Thread name " + Thread.currentThread().getName());
return "str";
}).thenApply(str -> {
System.out.println("thenApply Thread name " + Thread.currentThread().getName());
return str;
}).thenApply(str1 -> {
System.out.println("thenApply Thread name " + Thread.currentThread().getName());
return str1;
}).thenAccept(str3 -> {
System.out.println("thenAccept Thread name " + Thread.currentThread().getName());
});
System.out.println("Thread name " + Thread.currentThread().getName());
Run Code Online (Sandbox Code Playgroud)
输出:
supplyAsync Thread name ForkJoinPool.commonPool-worker-1
thenApply Thread name main
thenApply Thread name main
thenAccept Thread name main
Thread name main
Run Code Online (Sandbox Code Playgroud)
但是当我进行一些计算时,它没有按预期工作,如果我遗漏了什么,请纠正我。
CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync Thread name …Run Code Online (Sandbox Code Playgroud) 所以我有几个我想运行的期货,即使有些失败,我希望所有人都有机会运行。所以如果我这样做:
CompletableFuture.allOf(futures).join()
Run Code Online (Sandbox Code Playgroud)
会是这样吗?我的推理是,每个未来在其执行器中都有自己的队列作业,因此如果主线程没有先完成,所有未来都会运行。我的问题是我特别.join()关注.allOf()所以我的应用程序在运行所有内容之前不会结束
所以allOf()语义让我感到困惑:无论是否成功,当所有传递的期货都完成时,未来的回报是否会完成?或者,如果它看到一个失败而没有等待其余的,它会完成一个失败的未来吗?
编辑
为了进一步说明我的问题,.allOf行为是这样的:
Stream.of(futures).forEach(future -> {
try {
future.join()
} catch (Throwable e) {
//dont throw, we want to join the rest
}
})
Run Code Online (Sandbox Code Playgroud)
或者它的行为是否如下所示:
Stream.of(futures).forEach(future -> {
try {
future.join()
} catch (Throwable e) {
throw e; //All other remaining .join() wont run
}
})
Run Code Online (Sandbox Code Playgroud)
是哪个?第一种情况还是第二种情况?因为我想要第一种情况,这就是我暂时在代码中使用的情况,但allOf()如果可能的话,我想使用它,因为它更美观
谢谢!
我有四个服务。
One will give hotels which is matching to the query.
One will give City which is matching to the query.
One will give Airports which is matching to the query.
One will give Metro stations which is matching to the query.
Run Code Online (Sandbox Code Playgroud)
对于每个 HTTP 请求,我将命中这四个服务并合并来自四个服务的响应并返回。
所以我的示例代码就像
for(EntityGroups entityGroups: EntityGroups.values()){
Callable<Response> callable = new XCallable(entityTypetoFetch, searchQuery);
Future<List<Entity>> listFuture = executor.submit(callable);
entityTypeAndFutureMap.put(entityTypetoFetch, listFuture);
}
Run Code Online (Sandbox Code Playgroud)
在此之后,我在 for 循环中得到所有响应
trainStationList = entityTypeAndFutureMap.get(EntityGroups.TRAIN_STATION).get();
landmarkEntities = entityTypeAndFutureMap.get(EntityGroups.LANDMARK_GROUP).get();
cityEntities = entityTypeAndFutureMap.get(EntityGroups.CITY_GROUP).get();
hotelEntities = entityTypeAndFutureMap.get(EntityGroups.HOTEL_GROUP).get();
Run Code Online (Sandbox Code Playgroud)
因为我希望所有列表合并并做出最终响应。我正在拨打阻塞电话。我只想合并这些列表并返回。如果我在这里使用 completablefuture 会有帮助吗? …
我想触发许多一次性异步 CompletableFutures,如下所示:
for (Job job : jobs) {
CompletableFuture.supplyAsync(() -> job.process())
.whenComplete(this::doSomething);
}
Run Code Online (Sandbox Code Playgroud)
理想情况下,这些 CompletableFutures 可以在whenComplete完成后被垃圾回收。但是,由于我没有存储参考文献,因此是否存在事先收集它们的风险?
假设我想在遇到特定异常时使用某个值进行恢复,否则返回带有异常的失败未来。我希望是这样的:
public static void main(String[] args) {
CompletableFuture
.supplyAsync(FuturesExample::fetchValue)
.exceptionally(throwable -> {
if (throwable instanceof RuntimeException) {
return "All good";
}
throw throwable; // does not compile
});
}
public static String fetchValue() {
// code that potentially throws an exception
return "value";
}
Run Code Online (Sandbox Code Playgroud)
如果fetchValue函数会抛出一个已检查的异常,我想在链式方法中处理它。return throwable和都试过了throw throwable,但都没有编译。是否CompletableFuture为这种情况提供了任何解决方案?我知道Function作为exceptionally方法参数的接口不会抛出任何异常 - 在这种情况下,我只想返回已经失败的未来。我想找到一个使用 Java 8 的解决方案。
有一个代码是由某人编写的,他调用了 Completable Future 来做一个调用并忘记的活动,即输出没有得到照顾(没有get()调用)。
CompletableFuture.runAsync(() -> {
List<ActivityListTO> activities = auditCount(requestType, oldLimit, rule, newLimit);
few mores lines here.
...
, executor);
Run Code Online (Sandbox Code Playgroud)
我正在对此进行单元测试,但无法执行异步内部的代码。
我不能静态模拟它,因为我需要执行其中的代码。我尝试在我的测试用例中传递线程池实例,但它没有通过。
你能给我一些关于如何对异步中的代码进行单元测试的提示吗?
java ×8
java-8 ×6
asynchronous ×4
lambda ×2
exception ×1
forkjoinpool ×1
future ×1
java-stream ×1
junit4 ×1
semantics ×1
threadpool ×1
unit-testing ×1