使用 Java 中的 CompletableFuture 并行执行 for 循环并记录执行

moh*_*ank 5 java multithreading future executorservice completable-future

我有一个 for 循环,我正在尝试使用 CompletableFuture 对其进行并行化。

for (int i = 0; i < 10000; i++) {
    doSomething();
    doSomethingElse();
}
Run Code Online (Sandbox Code Playgroud)

到目前为止我所拥有的是:

for (int i = 0; i < 10000; i++) {
    CompletableFuture.runAsync(() -> doSomething());
    CompletableFuture.runAsync(() -> doSomethingElse());
}
Run Code Online (Sandbox Code Playgroud)

我想这可以达到目的,但是需要在所有处理开始和结束之前打印日志。如果我这样做:

log("Started doing things");
for (int i = 0; i < 10000; i++) {
    CompletableFuture.runAsync(() -> doSomething());
    CompletableFuture.runAsync(() -> doSomethingElse());
}
log("Ended doing things");
Run Code Online (Sandbox Code Playgroud)

这是否保证在所有 for 循环结束后将打印第二条日志语句,因为它是在单独的线程中执行的?如果没有,有没有办法在不阻塞主线程的情况下做到这一点?

isn*_*bad 7

我想CompletableFuture是您需要的错误概念。如果您想并行执行任意数量的类似任务,最简单的方法是invokeAll(...)在 an 上使用该方法ExecutionService

// First, create a list with all tasks you want to execute in parallel
List<Callable<?>> tasks = new ArrayList<>(10000);
for (int i = 0; i < 10000; ++i) {
    // we need to create Callables, so if your doSomething method returns void, we have to convert it to a Callable using helper method from class Executors
    tasks.add(Executors.callable(this::doSomething));
}

// Then, create an executor service that can execute these tasks
// There are different executors you can choose from, I take one that has a fixed pool of threads
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

// Last but not least, call invokeAll to execute all tasks and wait for them to complete
executorService.invokeAll(tasks);

// This method will be called when all tasks have been completed successfully:
System.out.println("done");
Run Code Online (Sandbox Code Playgroud)

  • 是的。但是您写道,您希望在所有任务完成后调用第二条日志语句。这正是阻塞的含义。 (2认同)

kai*_*sch 5

您必须收集所有 CompletableFutures 并等待它们完成:

log("Started doing things");
List<CompletableFuture> futures = new ArrayList();
for (int i = 0; i < 10000; i++) {
    futures.add(CompletableFuture.runAsync(() -> doSomething()));
    futures.add(CompletableFuture.runAsync(() -> doSomethingElse()));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                 .thenRunAsync(() -> log("Ended doing things"));
Run Code Online (Sandbox Code Playgroud)

或者当您使用 ExecutorService 时:

CompletableFuture.runAsync(() -> {
    try {
        executorService.invokeAll(tasks);
    } catch (InterruptedException) {
        e.printStackTrace();
    }
    log("Ended doing things");
});
Run Code Online (Sandbox Code Playgroud)