默认情况下,parallelStream 中的 commonPool 的大小应该是cpu_cores - 1.
但是,在我的应用程序中,它总是大于硬件cpu_cores。
VisualVM 截图:
很困惑,我已经在互联网上搜索了一半,但找不到答案。
我的配置:
Runtime.getRuntime().availableProcessors()=12
java.util.concurrent.ForkJoinPool.common.parallelism=null(默认设置)
我的代码:
final CountDownLatch countDownLatch = new CountDownLatch(tempList.size());
tempList.parallelStream().forEach(om -> {
countDownLatch.countDown();
redisReBloomService.add(config.getRedisKey(), om.getChannelNo());
});
countDownLatch.await();
Run Code Online (Sandbox Code Playgroud)
另外,我尝试过自定义池设置,但它也不起作用-
ForkJoinPool forkJoinPool = new ForkJoinPool(3);
forkJoinPool.submit(() -> {
tempList.parallelStream().forEach(om -> {
countDownLatch.countDown();
redisReBloomService.add(config.getRedisKey(), om.getChannelNo());
}).get();
});
Run Code Online (Sandbox Code Playgroud)
一些信息:https : //docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html Java 8 并行流中的自定义线程池
并行ForkJoinPool度不是池中的最大线程数。它是活动线程的目标。如果某些线程被阻塞,则池可能会启动新线程以达到所需的并行度级别。
来自 ForkJoinPool 的文档:
池尝试通过动态添加、挂起或恢复内部工作线程来维持足够的活动(或可用)线程,即使某些任务在等待加入其他任务时暂停。但是,面对阻塞的 I/O 或其他非托管同步,不能保证进行此类调整。嵌套的 ForkJoinPool.ManagedBlocker 接口支持扩展所容纳的同步类型。
屏幕截图显示,当其他线程切换到状态Monitor(粉红色的)时,新线程恰好在同一时间启动。我的猜测是该 redisReBloomService.add(…)调用ManagedBlocker在必须等待该监视器时在内部使用 a ,从而导致池启动更多工作线程。
这是一个小示例,ManagedBlocker它演示了您观察到的类似行为。当ManagedBlocker休眠 1 秒时,通常可以在 VisualVM 中观察到一个新的工作线程。
public class ForkJoinPoolTest {
@Test
public void testManagedBlocker() throws InterruptedException {
// wait to be able to connect with VisualVM
Thread.sleep(10_000);
IntStream.range(0, 100).parallel().peek(number -> {
doWork();
// Run a managed blocker some times.
// Every time it blocks, a new worker thread might be started.
if (ThreadLocalRandom.current().nextInt(10) == 0) {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
@Override
public boolean block() throws InterruptedException {
Thread.sleep(1_000);
return true;
}
@Override
public boolean isReleasable() {
return false;
}
});
} catch (InterruptedException ignored) { }
}
})
.sum();
}
/** Some CPU bound workload **/
void doWork() {
for (int i = 0; i < 1_000_000; i++) {
Math.random();
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
297 次 |
| 最近记录: |