parallelStream 中的线程大小大于 CPU 内核

Mic*_*elM 3 java

默认情况下,parallelStream 中的 commonPool 的大小应该是cpu_cores - 1.

但是,在我的应用程序中,它总是大于硬件cpu_cores

VisualVM 截图:

在此处输入图片说明

很困惑,我已经在互联网上搜索了一半,但找不到答案。

我的配置:

Runtime.getRuntime().availableProcessors()=12

java.util.concurrent.ForkJoinPool.common.parallelism=null(默认设置)

我的代码:

            final CountDownLatch countDownLatch = new CountDownLatch(tempList.size());
            tempList.parallelStream().forEach(om -> {
                countDownLatch.countDown();
                redisReBloomService.add(config.getRedisKey(), om.getChannelNo());
            });
            countDownLatch.await();
Run Code Online (Sandbox Code Playgroud)

另外,我尝试过自定义池设置,但它也不起作用-

ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
forkJoinPool.submit(() -> {  
    tempList.parallelStream().forEach(om -> {
        countDownLatch.countDown();
        redisReBloomService.add(config.getRedisKey(), om.getChannelNo());
    }).get();
});
Run Code Online (Sandbox Code Playgroud)

一些信息:https : //docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html Java 8 并行流中的自定义线程池

Flo*_*ann 5

并行ForkJoinPool度不是池中的最大线程数。它是活动线程的目标。如果某些线程被阻塞,则池可能会启动新线程以达到所需的并行度级别。

来自 ForkJoinPool 的文档:

池尝试通过动态添加、挂起或恢复内部工作线程来维持足够的活动(或可用)线程,即使某些任务在等待加入其他任务时暂停。但是,面对阻塞的 I/O 或其他非托管同步,不能保证进行此类调整。嵌套的 ForkJoinPool.ManagedBlocker 接口支持扩展所容纳的同步类型。

屏幕截图显示,当其他线程切换到状态Monitor(粉红色的)时,新线程恰好在同一时间启动。我的猜测是该 redisReBloomService.add(…)调用ManagedBlocker在必须等待该监视器时在内部使用 a ,从而导致池启动更多工作线程。

这是一个小示例,ManagedBlocker它演示了您观察到的类似行为。当ManagedBlocker休眠 1 秒时,通常可以在 VisualVM 中观察到一个新的工作线程。

public class ForkJoinPoolTest {

    @Test
    public void testManagedBlocker() throws InterruptedException {
        // wait to be able to connect with VisualVM
        Thread.sleep(10_000);

        IntStream.range(0, 100).parallel().peek(number -> {
                doWork();

                // Run a managed blocker some times.
                // Every time it blocks, a new worker thread might be started.
                if (ThreadLocalRandom.current().nextInt(10) == 0) {
                    try {
                        ForkJoinPool.managedBlock(new ManagedBlocker() {
                            @Override
                            public boolean block() throws InterruptedException {
                                Thread.sleep(1_000);
                                return true;
                            }

                            @Override
                            public boolean isReleasable() {
                                return false;
                            }
                        });
                    } catch (InterruptedException ignored) { }
                }
            })
            .sum();
    }

    /** Some CPU bound workload **/
    void doWork() {
        for (int i = 0; i < 1_000_000; i++) {
            Math.random();
        }
    }
}

Run Code Online (Sandbox Code Playgroud)