据我所知ForkJoinPool,该池创建了固定数量的线程(默认值:核心数),并且永远不会创建更多线程(除非应用程序通过使用表明需要这些线程managedBlock).
但是,使用ForkJoinPool.getPoolSize()我发现在创建30,000个任务(RecursiveAction)的程序中,ForkJoinPool执行这些任务平均使用700个线程(每次创建任务时计算的线程数).任务不做I/O,而是纯粹的计算; 唯一的任务间同步是调用ForkJoinTask.join()和访问AtomicBooleans,即没有线程阻塞操作.
因为join()不会像我理解的那样阻塞调用线程,所以没有理由为什么池中的任何线程都应该阻塞,所以(我曾经假设)应该没有理由创建任何进一步的线程(这显然发生了) .
那么,为什么要ForkJoinPool创建这么多线程呢?哪些因素决定了创建的线程数?
我曾希望这个问题可以在不发布代码的情况下得到解答,但在此请求.此代码摘自四倍大小的程序,简化为必要部分; 它不会按原样编译.如果需要,我当然也可以发布完整的程序.
程序使用深度优先搜索在迷宫中搜索从给定起点到给定终点的路径.保证存在解决方案.主要逻辑在以下compute()方法中SolverTask:A RecursiveAction从某个给定点开始,并继续从当前点可到达的所有邻居点.它不是SolverTask在每个分支点创建一个新的(这将创建太多的任务),而是将除了一个之外的所有邻居推送到后退堆栈以便稍后处理,并继续只有一个邻居没有被推送到堆栈.一旦它以这种方式达到死胡同,就会弹出最近推到回溯堆栈的点,并从那里继续搜索(相应地减少从taks起点构建的路径).一旦任务发现其回溯堆栈大于某个阈值,就会创建一个新任务; 从那时起,任务在继续从其回溯堆栈中弹出直到耗尽时,在到达分支点时不会将任何其他点推到其堆栈,而是为每个这样的点创建一个新任务.因此,可以使用堆栈限制阈值来调整任务的大小.
我上面引用的数字("30,000个任务,平均700个线程")来自于搜索5000x5000个单元格的迷宫.所以,这是基本代码:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000; …Run Code Online (Sandbox Code Playgroud) 鉴于不鼓励在Java EE容器中生成线程.是否也不鼓励在Java EE中使用可能产生线程的Java 8并行流?
我正在创建一个从200万开始的无限整数流,使用朴素素数测试实现过滤此流以生成负载并将结果限制为10.
Predicate<Integer> isPrime = new Predicate<Integer>() {
@Override
public boolean test(Integer n) {
for (int i = 2; i < n; i++) {
if (n % i == 0) return false;
}
return true;
}
};
Stream.iterate(200_000_000, n -> ++n)
.filter(isPrime)
.limit(10)
.forEach(i -> System.out.print(i + " "));
Run Code Online (Sandbox Code Playgroud)
这按预期工作.
现在,如果我在过滤之前添加对parallel()的调用,则不会产生任何内容并且处理无法完成.
Stream.iterate(200_000_000, n -> ++n)
.parallel()
.filter(isPrime)
.limit(10)
.forEach(i -> System.out.print(i + " "));
Run Code Online (Sandbox Code Playgroud)
有人能指出我在这里发生的事情的正确方向吗?
编辑:我不是在寻找更好的素性测试实现(它旨在成为一个长期运行的实现),而是为了解释使用并行流的负面影响.
Stream返回者map或mapToObj方法总是顺序的,还是取决于调用流的状态是否是并行的?
文档IntStream没有明确回答这个问题,或者我无法理解它:
我想知道来自以下示例的流是否会并行到最后,或者它会在某个时刻发生变化.
IntStream.range(1, array_of_X.size())
.parallel()
.mapToObj (index -> array_of_X.get(index)) // mapping#1
.filter (filter_X)
.map (X_to_Y) //mapping#2
.filter (filter_Y)
.mapToInt (obj_Y_to_int) //mapping#3
.collect(value -> Collectors.summingInt(value));
Run Code Online (Sandbox Code Playgroud)