Pan*_*ant 4 java multithreading java-8 java-11
考虑以下代码:
package com.sarvagya;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
public class Streamer {
private static final int LOOP_COUNT = 2000;
public static void main(String[] args){
try{
for(int i = 0; i < LOOP_COUNT; ++i){
poolRunner();
System.out.println("done loop " + i);
try{
Thread.sleep(50L);
}
catch (Exception e){
System.out.println(e);
}
}
}
catch (ExecutionException | InterruptedException e){
System.out.println(e);
}
// Add a delay outside the loop to make sure all daemon threads are cleared before main exits.
try{
Thread.sleep(10 * 60 * 1000L);
}
catch (Exception e){
System.out.println(e);
}
}
/**
* poolRunner method.
* Assume I don't have any control over this method e.g. done by some library.
* @throws InterruptedException
* @throws ExecutionException
*/
private static void poolRunner() throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool();
pool.submit(() ->{
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10, 11,12,14,15,16);
List<Integer> collect = numbers.stream()
.parallel()
.filter(xx -> xx > 5)
.collect(Collectors.toList());
System.out.println(collect);
}).get();
}
}
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,poolRunner方法是创建一个ForkJoinPool并向其提交一些任务.当使用Java 8并将LOOP_COUNT保持为2000时,我们可以看到创建的最大线程大约为3600,如下所示
图:分析
经过一段时间后,所有这些线程都会下降到接近10.但是,在OpenJDK 11中保持相同的LOOP_COUNT会产生以下错误:
[28.822s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
[28.822s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
[28.822s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
Exception in thread "ForkJoinPool-509-worker-5" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
at java.base/java.lang.Thread.start0(Native Method)
at java.base/java.lang.Thread.start(Thread.java:803)
at java.base/java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1329)
at java.base/java.util.concurrent.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1352)
at java.base/java.util.concurrent.ForkJoinPool.signalWork(ForkJoinPool.java:1476)
at java.base/java.util.concurrent.ForkJoinPool.deregisterWorker(ForkJoinPool.java:1458)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)
Run Code Online (Sandbox Code Playgroud)
它很快达到最大线程限制.将LOOP_COUNT保持为500,工作正常,但是,这些线程非常缓慢地清除并达到大约500个线程的平台.见下图:
线程在JDK 8中是PARKED,但在JDK 11中是WAIT.在Java 11中也应该减少守护程序线程的数量,但是它很慢并且不能按预期工作.此外,假设我无法控制poolRunner方法.考虑这种方法是由一些外部库提供的.
这是OpenJDK 11的问题,还是我在代码中做错了什么.谢谢.
你做错了.
在上面的代码中,我正在创建一个
ForkJoinPool并向其提交一些任务.
实际上,你正在创建2000个ForkJoinPool实例......
您应该创建一个ForkJoinPool具有适合当前任务的并行度(即线程数)的单个而不是这样做.
创建大量(即数千个)线程是一个非常糟糕的主意.即使您可以在不触发OOME的情况下执行此操作,您将消耗大量的堆栈和堆内存,并在调度程序和垃圾收集器上放置大量负载......没有任何实际好处.
您的代码正在创建大量ForkJoinPool实例,并且shutdown()在使用后永远不会调用任何池。由于在Java 8中,规范中没有任何内容可以保证工作线程将终止,因此该代码甚至可能以2000(?个池数?)倍?核数?线程。
在实践中,观察到的行为是由于两秒未记录的空闲超时引起的。请注意,根据评论,超时的结果是试图减少工作人员的数量,这与终止工作不同。因此,如果n个线程遇到超时,则不是所有n个线程都终止,而是将线程数减少1,其余线程可能会再次等待。此外,短语“初始超时值”已经暗示了它,实际超时每次发生时都会增加。因此,由于此(未记录)超时,n个空闲的工作线程终止需要花费几秒钟的时间。n * (n + 1)
从Java 9开始,有一个可配置的keepAliveTime,可以在的新构造函数中指定ForkJoinPool,该文档还记录了默认值:
keepAliveTime- 从上次使用到终止线程(然后如果需要,以后再替换)之前经过的时间。对于默认值,请使用
60, TimeUnit.SECONDS。
该文档可能会误以为现在所有工作线程在为keepAliveTime空闲时可能会一起终止,但是实际上,仍然存在一次仅将池缩小一个的行为,尽管现在时间没有增加。因此,现在,n个空闲工作线程终止最多需要几秒钟的时间。由于以前的行为是未指定的,因此它甚至不是不兼容的。60 * n
必须强调的是,即使具有相同的超时行为,最终产生的最大线程数也可能会发生变化,因为具有更好代码优化的新JVM减少了实际操作的执行时间(无需人为插入Thread.sleep(…)),它将更快地创建新线程。而终止仍然受制于时钟时间。
结论是,当您知道不再需要线程池时,就永远不要依赖自动工作线程终止。相反,您应该shutdown()在完成后致电。
您可以使用以下代码验证行为:
int threadNumber = 8;
ForkJoinPool pool = new ForkJoinPool(threadNumber);
// force the creation of all worker threads
pool.invokeAll(Collections.nCopies(threadNumber*2, () -> { Thread.sleep(500); return ""; }));
int oldNum = pool.getPoolSize();
System.out.println(oldNum+" threads; waiting for dying threads");
long t0 = System.nanoTime();
while(oldNum > 0) {
while(pool.getPoolSize()==oldNum)
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
long t1 = System.nanoTime();
oldNum = pool.getPoolSize();
System.out.println(threadNumber-oldNum+" threads terminated after "
+TimeUnit.NANOSECONDS.toSeconds(t1 - t0)+"s");
}
Run Code Online (Sandbox Code Playgroud)
Java 8:
int threadNumber = 8;
ForkJoinPool pool = new ForkJoinPool(threadNumber);
// force the creation of all worker threads
pool.invokeAll(Collections.nCopies(threadNumber*2, () -> { Thread.sleep(500); return ""; }));
int oldNum = pool.getPoolSize();
System.out.println(oldNum+" threads; waiting for dying threads");
long t0 = System.nanoTime();
while(oldNum > 0) {
while(pool.getPoolSize()==oldNum)
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
long t1 = System.nanoTime();
oldNum = pool.getPoolSize();
System.out.println(threadNumber-oldNum+" threads terminated after "
+TimeUnit.NANOSECONDS.toSeconds(t1 - t0)+"s");
}
Run Code Online (Sandbox Code Playgroud)
Java 11:
8 threads; waiting for dying threads
1 threads terminated after 2s
2 threads terminated after 6s
3 threads terminated after 12s
4 threads terminated after 20s
5 threads terminated after 30s
6 threads terminated after 42s
7 threads terminated after 56s
8 threads terminated after 72s
Run Code Online (Sandbox Code Playgroud)
从未完成,显然,至少有最后一个工作线程处于活动状态
| 归档时间: |
|
| 查看次数: |
986 次 |
| 最近记录: |