CompletableFuture.allOf 使用哪个执行器?

ita*_*ind 6 java parallel-processing java-8 completable-future

假设我们有两个执行者,1 和 2。

我们可以配置执行时使用哪个执行器

CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()-> {return 1;}, executor1) //executor1
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()-> {return 2;}, executor1) //executor1
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(()-> {return 3;}, executor2) //executor2
Run Code Online (Sandbox Code Playgroud)

但是哪个线程执行器使用 CompletableFuture 静态方法 allOf?

CompletableFuture.allOf(cf1, cf2, cf3)
Run Code Online (Sandbox Code Playgroud)

谢谢!

Did*_*r L 7

Ivan Gammel 的回答并不准确。

确实没有与CompletableFuture返回的executor 关联的 executor allOf(),因为事实上,从来没有与 any 关联的 executor CompletableFuture

一个任务是与执行相关的,因为它是运行在它的内部,但联想是逆:执行人有任务要执行的列表。

任务也可以与 关联CompletableFuture,它会在任务完成时完成。在CompletableFuture本身不保留对被用于其创建的任务或执行的参考。然而,它可能会保留对依赖阶段中使用的任务和可选的执行器的引用。

CompletableFuture通过返回allOf()将由任务,这是原来的从属阶段完成CompletableFuture秒。在您的示例中,此任务可以通过以下方式执行:

  • executor1,如果第三个任务先完成;
  • executor2, 如果前 2 个任务在第三个任务之前完成;或者
  • 原始线程,如果所有任务在您调用之前完成allOf()

这可以通过thenRun()allOf()调用中添加一个依赖阶段来看到:

public class CompletableFutureAllOfCompletion {
    private ExecutorService executor1 = Executors.newFixedThreadPool(2);
    private ExecutorService executor2 = Executors.newFixedThreadPool(2);
    private Random random = new Random();

    public static void main(String[] args) {
        new CompletableFutureAllOfCompletion().run();
    }

    public void run() {
        CompletableFuture<Integer> cf1 = supplyAsync(this::randomSleepAndReturn, executor1);
        CompletableFuture<Integer> cf2 = supplyAsync(this::randomSleepAndReturn, executor1);
        CompletableFuture<Integer> cf3 = supplyAsync(this::randomSleepAndReturn, executor2);
        randomSleepAndReturn();
        CompletableFuture.allOf(cf1, cf2, cf3)
                .thenRun(() -> System.out.println("allOf() commpleted on "
                        + Thread.currentThread().getName()));

        executor1.shutdown();
        executor2.shutdown();
    }

    public int randomSleepAndReturn() {
        try {
            final long millis = random.nextInt(1000);
            System.out.println(
                    Thread.currentThread().getName() + " waiting for " + millis);
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 0;
    }
}
Run Code Online (Sandbox Code Playgroud)

一些可能的输出:

在第一个执行者上完成(第三个任务先完成):

pool-1-thread-1 waiting for 937
pool-1-thread-2 waiting for 631
main waiting for 776
pool-2-thread-1 waiting for 615
allOf() commpleted on pool-1-thread-1
Run Code Online (Sandbox Code Playgroud)

在第二个执行者上完成(第一个和第二个任务在第三个之前完成):

pool-1-thread-1 waiting for 308
pool-1-thread-2 waiting for 788
main waiting for 389
pool-2-thread-1 waiting for 863
allOf() commpleted on pool-2-thread-1
Run Code Online (Sandbox Code Playgroud)

在主线程上完成(所有任务都在之前完成allOf().thenRun()):

pool-1-thread-1 waiting for 168
pool-1-thread-2 waiting for 292
main waiting for 941
pool-2-thread-1 waiting for 188
allOf() commpleted on main
Run Code Online (Sandbox Code Playgroud)

如何控制将在allOf()(或anyOf())之后使用的执行器

由于无法保证将使用的执行程序,因此调用这些方法之一后应*Async(, executor)调用控制将使用哪个执行程序的调用。

如果您需要返回CompletableFuture其中一个调用的结果,只需thenApplyAsync(i -> i, executor)在返回之前添加一个。