QUA*_*ang 5 java multithreading future reactor completable-future
在 java 中使用 CompletableFuture 时遇到问题。我有 2 个选择请求,它们是在从服务器接收响应时填充的。
在连接线程(THREAD-1)(使用反应器)中,我使用:
if(hasException) {
selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
System.out.println("Before complete future");
selectFuture.complete(result);
System.out.println("After complete future");
}
Run Code Online (Sandbox Code Playgroud)
在其他线程(THREAD-2)中,我使用:
CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
System.out.println("Receive all future");
// Do sth here
});
Run Code Online (Sandbox Code Playgroud)
我的情况是系统打印出“接收所有未来”但调用时 THREAD-1 被阻止future.complete(result);它无法退出该命令。如果在 THREAD-2 中,我使用CompletableFuture.allOf(allOfSelect).get(),THREAD-1 将正确运行。但是使用CompletableFuture.get()会降低性能,所以我想使用CompletableFuture.whenComplete().
谁能帮我解释一下阻塞的原因?
谢谢!
该complete调用会触发所有相关的CompletionStages。
因此,如果您之前已BiConsumer用注册了whenComplete,complete则将在其调用线程中调用它。在您的情况下,调用complete将在BiConsumer您传递给whenComplete完成时返回。这在类 javadoc 中描述
为非异步方法的依赖完成提供的操作可以由完成当前 的线程
CompletableFuture或完成方法的任何其他调用者执行。
(另一个调用者是相反的情况,如果目标已经完成,线程调用whenComplete实际上会应用。)BiConsumer CompletableFuture
这是一个说明行为的小程序:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
future.whenComplete((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
}
Run Code Online (Sandbox Code Playgroud)
这将打印
Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done
Run Code Online (Sandbox Code Playgroud)
显示BiConsumer已应用在主线程中,即调用complete.
您可以使用whenCompleteAsync来强制BiConsumer在单独的线程中执行 。
[...]在此阶段完成时使用此阶段的默认异步执行工具执行给定操作。
例如,
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
done.get();
}
Run Code Online (Sandbox Code Playgroud)
将打印
Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
Run Code Online (Sandbox Code Playgroud)
显示BiConsumer已应用在单独的线程中。