为什么并行流不使用ForkJoinPool的所有线程?

Pab*_*mez 14 java concurrency multithreading java-8 java-stream

所以我知道,如果你使用parallelStream没有自定义ForkJoinPool,它将使用默认的ForkJoinPool,默认情况下,只有一个线程,因为你有处理器.

因此,如此处所述(以及该问题的另一个答案)为了获得更多的并行性,您必须:

将并行流执行提交给您自己的ForkJoinPool:yourFJP.submit(() - > stream.parallel().forEach(doSomething));

所以,我这样做了:

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool(1000);

        IntStream stream = IntStream.range(0, 999999);

        final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());

        forkJoinPool.submit(() -> {
            stream.parallel().forEach(n -> {

                System.out.println("Processing n: " + n);
                try {
                    Thread.sleep(500);
                    thNames.add(Thread.currentThread().getName());
                    System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }).get();
    }
}
Run Code Online (Sandbox Code Playgroud)

我创建了一组线程名称,以便查看正在创建的线程数,并记录了池所具有的活动线程数,并且这两个数字的长度都不超过16,这意味着这里的并行性是不超过16(为什么甚至16?).如果我不使用forkJoinPool,我会得到4作为并行性,这取决于我拥有的处理器数量.

为什么它给我16而不是1000?

Dim*_*rov 11

更新

最初这个答案是一个精心设计的解释,声称ForkJoinPool应用背压并且甚至没有达到规定的并行度水平,因为总有空闲工作人员可以处理流.

那是不对的.

实际答案在原始问题中提供,其被标记为重复 - 使用自定义ForkJoinPool进行流处理不受官方支持,并且在使用时forEach,默认池并行性用于确定流分裂器行为.

下面是一个示例,当手动将任务提交到自定义时ForkJoinPool,池的活动线程数很容易达到其并行度级别:

for (int i = 0; i < 1_000_000; ++i) {
   forkJoinPool.submit(() -> {
      try {
         Thread.sleep(1);
         thNames.add(Thread.currentThread().getName());
         System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   });
}
Run Code Online (Sandbox Code Playgroud)

感谢Stuart Marks指出这一点并向Sotirios Delimanolis辩解我的原始答案是错误的:)


edh*_*ned -5

在我看来,当您向 FJP 提交 lambda 时,lambda 将使用公共池而不是 FJP。Sotirios Delimanolis 通过他的上述评论证明了这一点。您提交的是在您的 FJP 中运行的任务。

尝试分析此代码以查看实际使用的线程。

无法命名 FJP 中的线程。

  • 不,不。也许我的评论措辞不正确。该任务应该在提交它的任何“ForkJoinPool”实例的上下文中运行。它通过查看当前正在运行的线程的类型来做到这一点。如果它的类型是“ForkJoinWorkerThread”,那么它就可以访问创建它的“ForkJoinPool”。它可以使用它来提交其他并行任务。您可以通过提供适当的线程工厂来命名自定义“ForkJoinPool”的线程。 (3认同)