鉴于新的Java8,我们为异步任务获得了非常好的功能,例如CompletableFuture和.paralellStream().如果你在Java SE中运行它,因为我已经理解它你将使用ForkJoinPool,但是如果我在例如Wildfly或TomcatEE中运行以下示例会发生什么?
//Here I start a comp.Future without giving an Executor
test = CompletableFuture.supplyAsync(() -> timeConsumingMethod());
//Here I start a parallel stream
mList.paralell().filter(...).collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)
会发生什么,我将从哪里借用我的资源
在Async Http Client文档中,我看到如何获取Future<Response>异步HTTP Get请求的结果,例如:
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
Future<Response> f = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/398")
.execute();
Response r = f.get();
Run Code Online (Sandbox Code Playgroud)
但是,为方便起见,我希望得到一个CompletableFuture<T>替代方案,我可以应用一个将结果转换为其他内容的延续,例如将响应内容从Json反序列化为Java对象(例如SoccerSeason.java).这就是我想做的事情:
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
CompletableFuture<Response> f = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/398")
.execute();
f
.thenApply(r -> gson.fromJson(r.getResponseBody(), SoccerSeason.class))
.thenAccept(System.out::println);
Run Code Online (Sandbox Code Playgroud)
根据Async Http Client文档,执行此操作的唯一方法是通过AsyncCompletionHandler<T>对象和使用promise.所以我为此构建了一个辅助方法:
CompletableFuture<Response> getDataAsync(String path){
CompletableFuture<Response> promise = new CompletableFuture<>();
asyncHttpClient
.prepareGet(path)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(Response response) throws Exception {
promise.complete(response);
return response;
}
@Override …Run Code Online (Sandbox Code Playgroud) Given this piece of code:
public List<String> findPrices(String product){
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
Run Code Online (Sandbox Code Playgroud)
This part of it:
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor
)))
Run Code Online (Sandbox Code Playgroud)
Could it be rewrite as:
.map(future ->
future.thenComposeAsync(quote -> Discount.applyDiscount(quote), executor))
Run Code Online (Sandbox Code Playgroud)
I took this code from an example of a book and says the two solutions are …
我有一个创建成本很高的对象映射,因此我想创建对象并与应用程序中的其他进程并行填充映射。只有当主线程实际需要访问地图时,应用程序才应等待填充地图的异步任务完成。我怎样才能最优雅地做到这一点?
目前,我能够使用CompletableFuture.runAsync(Runnable, Executor)类似于下面的示例代码中的方式异步创建地图本身中的每个单独对象,但我不确定如何构建Future/ CompletableFuture-type 机制以Map在准备好时返回自身:
public static class AsynchronousMapPopulator {
private final Executor backgroundJobExecutor;
public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
this.backgroundJobExecutor = backgroundJobExecutor;
}
public ConcurrentMap<String, Integer> apply(final Map<String,Integer> input) {
final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
for (final Entry<String, Integer> entry : input.entrySet()) {
final String className = entry.getKey();
final Integer oldValue = entry.getValue();
final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
result.put(className, oldValue + 1);
}, …Run Code Online (Sandbox Code Playgroud) multithreading asynchronous future java-8 completable-future
我目前正在使用 CompletableFuture supplyAsync() 方法将一些任务提交到公共线程池。下面是代码片段的样子:
final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
.map(resolver -> supplyAsync(() -> task.doWork()))
.collect(toList());
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();
final List<Test> tests = new ArrayList<>();
completableFutures.stream()
.map(completableFuture -> completableFuture.getNow())
.forEach(tests::addAll);
Run Code Online (Sandbox Code Playgroud)
我想知道下面的代码与上面的代码有何不同。我从下面的代码中删除了父 completableFuture,并为每个 completableFuture 添加了 join() 而不是 getNow():
final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
.map(resolver -> supplyAsync(() -> task.doWork()))
.collect(toList());
final List<Test> tests = new ArrayList<>();
completableFutures.stream()
.map(completableFuture -> completableFuture.join())
.forEach(tests::addAll);
Run Code Online (Sandbox Code Playgroud)
我在 spring 服务中使用它,并且存在线程池耗尽的问题。任何指针都深表感谢。
我有这段代码,我想重构Java 8
List<String> menus = new ArrayList<String>();
for (Menu menu : resto1.getMenu()) {
MainIngredient mainIngredient = MainIngredient.getMainIngredient(menu.getName());
if (mainIngredient.getIngredient().indexOf("Vegan")!=-1) {
menus.add(menu.getName());
}
}
Run Code Online (Sandbox Code Playgroud)
重构这个简单的循环之后,似乎代码太多......我是否正确使用CompletableFutures?
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<MainIngredient>> priceFutureList = resto1.getMenu().stream()
.map(menu -> CompletableFuture.supplyAsync(
() -> MainIngredient.getMainIngredient(menu.getName()), executorService))
.collect(Collectors.toList());
CompletableFuture<Void> allFuturesDone = CompletableFuture.allOf(
priceFutureList.toArray(new CompletableFuture[priceFutureList.size()]));
CompletableFuture<List<MainIngredient>> priceListFuture =
allFuturesDone.thenApply(v -> priceFutureList.stream()
.map(CompletableFuture::join)
.collect(toList()));
Run Code Online (Sandbox Code Playgroud) 有以下暂存代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> process1 = CompletableFuture.runAsync(() -> {
System.out.println("Process 1 with exception");
throw new RuntimeException("Exception 1");
});
CompletableFuture<Void> process2 = CompletableFuture.runAsync(() -> {
System.out.println("Process 2 without exception");
});
CompletableFuture<Void> process3 = CompletableFuture.runAsync(() -> {
System.out.println("Process 3 with exception");
throw new RuntimeException("Exception 3");
});
CompletableFuture<Void> allOfProcesses = CompletableFuture.allOf(process1, process2, process3);
allOfProcesses.get();
}
Run Code Online (Sandbox Code Playgroud)
我正在寻找如何收集并行执行期间引发的所有异常CompletableFuture.allOf()并将其映射到列表的方法。
我知道我可以通过返回异常(CompletableFuture<Exception>)而不是通过使用抛出并收集它来做到这一点CompletableFuture::join,但我认为抛出异常方法比稍后返回并抛出它更好
我有一个第三方 API,我使用 HTTP GET 请求调用它。每个请求需要几秒钟才能得到响应。
目前我正在使用 CompletableFuture,它在大小为 64 的 FixThreadPool 上执行。这会导致线程被阻塞,直到收到 GET 请求的响应,即线程在发送 GET 响应后处于空闲状态,直到收到响应。因此,我可以发送的最大并发请求数受到线程大小的限制,即此处的 64。
我可以使用什么来代替 CompletableFuture,这样我的线程就不会闲置等待响应?
我收到这个异常:
Caused by: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) ~[?:1.8.0_302]
at com.tcom.concurrent.ConcurrentUtils$3.onSuccess(ConcurrentUtils.java:140) ~[framework-20220815.38-RELEASE.jar:?]
Run Code Online (Sandbox Code Playgroud)
这种异常很少见,并且在我的系统中无法重现。
查看 CompletableFuture.java 似乎fFunction 变量为 null。
但第二行有一个空检查uniApply(),所以f不能为空。
那么为什么 JVM 声称我在函数调用行上有 NPE 呢?如果 NPE 来自调用的函数内部,我不应该在堆栈跟踪中看到它吗?
public static <I, O> CompletableFuture<O> buildCompletableFuture(final ListenableFuture<I> listenableFuture,
final Function<I, O> responseApplier,
final Consumer<I> onSuccessConsumer,
final Consumer<Throwable> onFailureConsumer,
final Supplier<O> defaultValueOnExceptionSupplier,
final Executor callBackExecutor) {
//create an instance of CompletableFuture
final CompletableFuture<I> innerComplete = new CompletableFuture<I>() {
@Override
public …Run Code Online (Sandbox Code Playgroud) java asynchronous nullpointerexception java-8 completable-future
我的新团队正在编写一个 Java gRPC 服务,为了确保我们永远不会阻塞请求线程,我们最终将或多或少的所有方法包装在 CompletableFuture 中,即使这些端点在概念上是操作的顺序列表(无并行性)。
\n所以代码看起来像这样(如果需要,最后可以提供 Java 示例):
\n methodA()\n methodB()\n methodD() (let say this one is a 15ms RPC call)\n methodE()\n methodC()\n methodF() (let say this one is a 5ms CPU intensive work)\n methodG()\n \nRun Code Online (Sandbox Code Playgroud)\n语境:
\n编辑1:昨天在网上进行了更多阅读后,我明白,当且仅当我们使用真正的非阻塞HTTP和DB客户端(并且看起来JDBC不是非阻塞的)时,这种模式可以减少所需的线程总数。我的理解是,如果我们有足够的内存来为每个请求保留一个线程,那么使用同步代码仍然可能是最有效的实现(减少切换线程和加载数据的开销),但是如果我们没有足够的内存为了保持那么多线程处于活动状态,那么使整个代码成为非阻塞的概念可以减少线程数量,从而允许应用程序扩展到更多请求。
\n问题一: \n我知道这会解锁“请求线程”,但实际上有什么好处?我们真的节省了 CPU 时间吗?在下面的示例中,感觉“某些”线程无论如何都会一直处于活动状态(在下面的示例中,主要是来自 methodD 中 CompletableFuture.supplyAsync 的线程),只是碰巧它\xe2\x80\x99s 不一样线程作为接收初始请求的线程。
\n问题 …
java ×8
asynchronous ×6
java-8 ×5
future ×1
grpc ×1
java-ee ×1
java-ee-7 ×1
java-stream ×1
java-threads ×1
nonblocking ×1
spring ×1
threadpool ×1