Java CompletableFuture.complete() 块

QUA*_*ang 5 java multithreading future reactor completable-future

在 java 中使用 CompletableFuture 时遇到问题。我有 2 个选择请求,它们是在从服务器接收响应时填充的。

在连接线程(THREAD-1)(使用反应器)中,我使用:

if(hasException) {
   selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
   System.out.println("Before complete future");
   selectFuture.complete(result);
   System.out.println("After complete future");
}
Run Code Online (Sandbox Code Playgroud)

在其他线程(THREAD-2)中,我使用:

CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
   System.out.println("Receive all future");
   // Do sth here
});
Run Code Online (Sandbox Code Playgroud)

我的情况是系统打印出“接收所有未来”但调用时 THREAD-1 被阻止future.complete(result);它无法退出该命令。如果在 THREAD-2 中,我使用CompletableFuture.allOf(allOfSelect).get(),THREAD-1 将正确运行。但是使用CompletableFuture.get()会降低性能,所以我想使用CompletableFuture.whenComplete().

谁能帮我解释一下阻塞的原因?

谢谢!

Sot*_*lis 6

complete调用会触发所有相关的CompletionStages。

因此,如果您之前已BiConsumer用注册了whenCompletecomplete则将在其调用线程中调用它。在您的情况下,调用complete将在BiConsumer您传递给whenComplete完成时返回。这在类 javadoc 中描述

为非异步方法的依赖完成提供的操作可以由完成当前 的线程 CompletableFuture或完成方法的任何其他调用者执行。

另一个调用者是相反的情况,如果目标已经完成,线程调用whenComplete实际上会应用。)BiConsumer CompletableFuture

这是一个说明行为的小程序:

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    future.whenComplete((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
}
Run Code Online (Sandbox Code Playgroud)

这将打印

Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done
Run Code Online (Sandbox Code Playgroud)

显示BiConsumer已应用在主线程中,即调用complete.

您可以使用whenCompleteAsync来强制BiConsumer在单独的线程中执行 。

[...]在此阶段完成时使用此阶段的默认异步执行工具执行给定操作。

例如,

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
    done.get();
}
Run Code Online (Sandbox Code Playgroud)

将打印

Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
Run Code Online (Sandbox Code Playgroud)

显示BiConsumer已应用在单独的线程中。