为什么parallelStream不使用整个可用的并行性?

Ais*_*war 8 java multithreading fork-join java-8 java-stream

我有一个自定义ForkJoinPool创建的并行度为25.

customForkJoinPool = new ForkJoinPool(25);
Run Code Online (Sandbox Code Playgroud)

我有一个包含700个文件名的列表,我使用这样的代码从S3并行下载文件并将它们转换为Java对象:

customForkJoinPool.submit(() -> {
   return fileNames
     .parallelStream()
     .map((fileName) -> {
        Logger log = Logger.getLogger("ForkJoinTest");
        long startTime = System.currentTimeMillis();
        log.info("Starting job at Thread:" + Thread.currentThread().getName());
        MyObject obj = readObjectFromS3(fileName);
        long endTime = System.currentTimeMillis();
        log.info("completed a job with Latency:" + (endTime - startTime));
        return obj;
     })
     .collect(Collectors.toList);
   });
});
Run Code Online (Sandbox Code Playgroud)

当我查看日志时,我看到只使用了5个线程.平行度为25,我预计这将使用25个线程.下载文件并将文件转换为对象的平均延迟时间约为200毫秒.我错过了什么?

可能更好的问题是并行流如何在为其创建线程之前分析原始列表的分割量?在这种情况下,看起来它决定将它拆分5次并停止.

Mis*_*sha 6

你为什么这样做ForkJoinPool?它适用于CPU绑定任务,其子任务太快,无法保证个人调度.您的工作负载是IO限制的,延迟时间为200毫秒,单个调度开销可以忽略不计.

使用Executor:

import static java.util.stream.Collectors.toList;
import static java.util.concurrent.CompletableFuture.supplyAsync;

ExecutorService threads = Executors.newFixedThreadPool(25);

List<MyObject> result = fileNames.stream()
        .map(fn -> supplyAsync(() -> readObjectFromS3(fn), threads))
        .collect(toList()).stream()
        .map(CompletableFuture::join)
        .collect(toList());
Run Code Online (Sandbox Code Playgroud)


Ste*_*n C 5

我认为答案就在这...来自ForkJoinPooljavadoc。

“即使某些任务因等待加入而停滞不前,该池也试图通过动态添加,暂停或恢复内部工作线程来维护足够的活动(或可用)线程。但是,面对这样的调整,我们无法保证此类调整: O或其他不受管理的同步。”

就您而言,下载将执行阻止的I / O操作。