ForkJoinPool性能Java 8 vs 11

Pan*_*ant 4 java multithreading java-8 java-11

考虑以下代码:

package com.sarvagya;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

public class Streamer {
    private static final int LOOP_COUNT  = 2000;
    public static void main(String[] args){
        try{
            for(int i = 0; i < LOOP_COUNT; ++i){
                poolRunner();
                System.out.println("done loop " + i);
                try{
                    Thread.sleep(50L);
                }
                catch (Exception e){
                    System.out.println(e);
                }
            }
        }
        catch (ExecutionException | InterruptedException e){
            System.out.println(e);
        }

        // Add a delay outside the loop to make sure all daemon threads are cleared before main exits.
        try{
            Thread.sleep(10 * 60 * 1000L);
        }
        catch (Exception e){
            System.out.println(e);
        }
    }

    /**
     * poolRunner method.
     * Assume I don't have any control over this method e.g. done by some library.
     * @throws InterruptedException
     * @throws ExecutionException
     */
    private static void poolRunner() throws InterruptedException, ExecutionException {
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(() ->{
            List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10, 11,12,14,15,16);
            List<Integer> collect = numbers.stream()
                    .parallel()
                    .filter(xx -> xx > 5)
                    .collect(Collectors.toList());
            System.out.println(collect);
        }).get();
    }
}
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,poolRunner方法是创建一个ForkJoinPool并向其提交一些任务.当使用Java 8并将LOOP_COUNT保持为2000时,我们可以看到创建的最大线程大约为3600,如下所示 分析信息 图:分析

JDK 8中的最大线程数 图:线程信息.

经过一段时间后,所有这些线程都会下降到接近10.但是,在OpenJDK 11中保持相同的LOOP_COUNT会产生以下错误:

[28.822s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
[28.822s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
[28.822s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
Exception in thread "ForkJoinPool-509-worker-5" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
    at java.base/java.lang.Thread.start0(Native Method)
    at java.base/java.lang.Thread.start(Thread.java:803)
    at java.base/java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1329)
    at java.base/java.util.concurrent.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1352)
    at java.base/java.util.concurrent.ForkJoinPool.signalWork(ForkJoinPool.java:1476)
    at java.base/java.util.concurrent.ForkJoinPool.deregisterWorker(ForkJoinPool.java:1458)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)
Run Code Online (Sandbox Code Playgroud)

它很快达到最大线程限制.将LOOP_COUNT保持为500,工作正常,但是,这些线程非常缓慢地清除并达到大约500个线程的平台.见下图:

OpenJDK 11中的线程信息 图:OpenJDK 11中的线程信息

资料信息 图:OpenJDK中的分析11

线程在JDK 8中是PARKED,但在JDK 11中是WAIT.在Java 11中也应该减少守护程序线程的数量,但是它很慢并且不能按预期工作.此外,假设我无法控制poolRunner方法.考虑这种方法是由一些外部库提供的.

这是OpenJDK 11的问题,还是我在代码中做错了什么.谢谢.

Ste*_*n C 6

你做错了.

在上面的代码中,我正在创建一个ForkJoinPool并向其提交一些任务.

实际上,你正在创建2000个ForkJoinPool实例......

您应该创建一个ForkJoinPool具有适合当前任务的并行度(即线程数)的单个而不是这样做.

创建大量(即数千个)线程是一个非常糟糕的主意.即使您可以在不触发OOME的情况下执行此操作,您将消耗大量的堆栈和堆内存,并在调度程序和垃圾收集器上放置大量负载......没有任何实际好处.

  • 代码不仅创建了大量的池,它在使用后忘记在每个池上调用`shutdown()`,因此规范中没有任何内容保证工作线程将终止.因此对于16核机器,它可以创建32000个永不濒临死亡的线程,所有这些都在规范范围内. (4认同)
  • 有一个原因我们没有启用并行流在指定的池中运行,而是总是在公共池中运行 - 为了防止作者已采取极端措施的那种自己在脚下的射击安排在这里."poolRunner()"的作者认为他们通过"超越"运行时,以及从他们不理解的堆栈溢出中剪切和粘贴"变通方法"代码来变得聪明.这段代码终结了; 你唯一的选择就是不要打电话. (4认同)

Hol*_*ger 6

您的代码正在创建大量ForkJoinPool实例,并且shutdown()在使用后永远不会调用任何池。由于在Java 8中,规范中没有任何内容可以保证工作线程将终止,因此该代码甚至可能以2000(?个池数?)倍?核数?线程。

在实践中,观察到的行为是由于两秒未记录的空闲超时引起的。请注意,根据评论,超时的结果是试图减少工作人员的数量,这与终止工作不同。因此,如果n个线程遇到超时,则不是所有n个线程都终止,而是将线程数减少1,其余线程可能会再次等待。此外,短语“初始超时值”已经暗示了它,实际超时每次发生时都会增加。因此,由于此(未记录)超时,n个空闲的工作线程终止需要花费几秒钟的时间。n * (n + 1)

从Java 9开始,有一个可配置的keepAliveTime,可以在的新构造函数中指定ForkJoinPool,该文档还记录了默认值:

keepAliveTime
从上次使用到终止线程(然后如果需要,以后再替换)之前经过的时间。对于默认值,请使用60, TimeUnit.SECONDS

该文档可能会误以为现在所有工作线程在为keepAliveTime空闲时可能会一起终止,但是实际上,仍然存在一次仅将池缩小一个的行为,尽管现在时间没有增加。因此,现在,n个空闲工作线程终止最多需要几秒钟的时间。由于以前的行为是未指定的,因此它甚至不是不兼容的。60 * n

必须强调的是,即使具有相同的超时行为,最终产生的最大线程数也可能会发生变化,因为具有更好代码优化的新JVM减少了实际操作的执行时间(无需人为插入Thread.sleep(…)),它将更快地创建新线程。而终止仍然受制于时钟时间。


结论是,当您知道不再需要线程池时,就永远不要依赖自动工作线程终止。相反,您应该shutdown()在完成后致电。


您可以使用以下代码验证行为:

int threadNumber = 8;
ForkJoinPool pool = new ForkJoinPool(threadNumber);
// force the creation of all worker threads
pool.invokeAll(Collections.nCopies(threadNumber*2, () -> { Thread.sleep(500); return ""; }));
int oldNum = pool.getPoolSize();
System.out.println(oldNum+" threads; waiting for dying threads");
long t0 = System.nanoTime();
while(oldNum > 0) {
    while(pool.getPoolSize()==oldNum)
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
    long t1 = System.nanoTime();
    oldNum = pool.getPoolSize();
    System.out.println(threadNumber-oldNum+" threads terminated after "
        +TimeUnit.NANOSECONDS.toSeconds(t1 - t0)+"s");
}
Run Code Online (Sandbox Code Playgroud) Java 8:
int threadNumber = 8;
ForkJoinPool pool = new ForkJoinPool(threadNumber);
// force the creation of all worker threads
pool.invokeAll(Collections.nCopies(threadNumber*2, () -> { Thread.sleep(500); return ""; }));
int oldNum = pool.getPoolSize();
System.out.println(oldNum+" threads; waiting for dying threads");
long t0 = System.nanoTime();
while(oldNum > 0) {
    while(pool.getPoolSize()==oldNum)
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
    long t1 = System.nanoTime();
    oldNum = pool.getPoolSize();
    System.out.println(threadNumber-oldNum+" threads terminated after "
        +TimeUnit.NANOSECONDS.toSeconds(t1 - t0)+"s");
}
Run Code Online (Sandbox Code Playgroud) Java 11:
8 threads; waiting for dying threads
1 threads terminated after 2s
2 threads terminated after 6s
3 threads terminated after 12s
4 threads terminated after 20s
5 threads terminated after 30s
6 threads terminated after 42s
7 threads terminated after 56s
8 threads terminated after 72s
Run Code Online (Sandbox Code Playgroud)

从未完成,显然,至少有最后一个工作线程处于活动状态