Hol*_*ine 42 java parallel-processing fork-join threadpool
据我所知ForkJoinPool,该池创建了固定数量的线程(默认值:核心数),并且永远不会创建更多线程(除非应用程序通过使用表明需要这些线程managedBlock).
但是,使用ForkJoinPool.getPoolSize()我发现在创建30,000个任务(RecursiveAction)的程序中,ForkJoinPool执行这些任务平均使用700个线程(每次创建任务时计算的线程数).任务不做I/O,而是纯粹的计算; 唯一的任务间同步是调用ForkJoinTask.join()和访问AtomicBooleans,即没有线程阻塞操作.
因为join()不会像我理解的那样阻塞调用线程,所以没有理由为什么池中的任何线程都应该阻塞,所以(我曾经假设)应该没有理由创建任何进一步的线程(这显然发生了) .
那么,为什么要ForkJoinPool创建这么多线程呢?哪些因素决定了创建的线程数?
我曾希望这个问题可以在不发布代码的情况下得到解答,但在此请求.此代码摘自四倍大小的程序,简化为必要部分; 它不会按原样编译.如果需要,我当然也可以发布完整的程序.
程序使用深度优先搜索在迷宫中搜索从给定起点到给定终点的路径.保证存在解决方案.主要逻辑在以下compute()方法中SolverTask:A RecursiveAction从某个给定点开始,并继续从当前点可到达的所有邻居点.它不是SolverTask在每个分支点创建一个新的(这将创建太多的任务),而是将除了一个之外的所有邻居推送到后退堆栈以便稍后处理,并继续只有一个邻居没有被推送到堆栈.一旦它以这种方式达到死胡同,就会弹出最近推到回溯堆栈的点,并从那里继续搜索(相应地减少从taks起点构建的路径).一旦任务发现其回溯堆栈大于某个阈值,就会创建一个新任务; 从那时起,任务在继续从其回溯堆栈中弹出直到耗尽时,在到达分支点时不会将任何其他点推到其堆栈,而是为每个这样的点创建一个新任务.因此,可以使用堆栈限制阈值来调整任务的大小.
我上面引用的数字("30,000个任务,平均700个线程")来自于搜索5000x5000个单元格的迷宫.所以,这是基本代码:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;
/**
* @return Tries to compute a path through the maze from local start to end
* and returns that (or null if no such path found)
*/
@Override
public ArrayDeque<Point> compute() {
// Is this task still accepting new branches for processing on its own,
// or will it create new tasks to handle those?
boolean stillAcceptingNewBranches = true;
Point current = localStart;
ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>(); // Path from localStart to (including) current
ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
// Used as a stack: Branches not yet taken; solver will backtrack to these branching points later
Direction[] allDirections = Direction.values();
while (!current.equals(end)) {
pathFromLocalStart.addLast(current);
// Collect current's unvisited neighbors in random order:
ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);
for (Direction directionToNeighbor: allDirections) {
Point neighbor = current.getNeighbor(directionToNeighbor);
// contains() and hasPassage() are read-only methods and thus need no synchronization
if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
}
// Process unvisited neighbors
if (neighborsToVisit.size() == 1) {
// Current node is no branch: Continue with that neighbor
current = neighborsToVisit.getFirst().getPoint();
continue;
}
if (neighborsToVisit.size() >= 2) {
// Current node is a branch
if (stillAcceptingNewBranches) {
current = neighborsToVisit.removeLast().getPoint();
// Push all neighbors except one on the backtrack stack for later processing
for(PointAndDirection neighborAndDirection: neighborsToVisit)
backtrackStack.push(neighborAndDirection);
if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
stillAcceptingNewBranches = false;
// Continue with the one neighbor that was not pushed onto the backtrack stack
continue;
} else {
// Current node is a branch point, but this task does not accept new branches any more:
// Create new task for each neighbor to visit and wait for the end of those tasks
SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
int t = 0;
for(PointAndDirection neighborAndDirection: neighborsToVisit) {
SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
task.fork();
subTasks[t++] = task;
}
for (SolverTask task: subTasks) {
ArrayDeque<Point> subTaskResult = null;
try {
subTaskResult = task.join();
} catch (CancellationException e) {
// Nothing to do here: Another task has found the solution and cancelled all other tasks
}
catch (Exception e) {
e.printStackTrace();
}
if (subTaskResult != null) { // subtask found solution
pathFromLocalStart.addAll(subTaskResult);
// No need to wait for the other subtasks once a solution has been found
return pathFromLocalStart;
}
} // for subTasks
} // else (not accepting any more branches)
} // if (current node is a branch)
// Current node is dead end or all its neighbors lead to dead ends:
// Continue with a node from the backtracking stack, if any is left:
if (backtrackStack.isEmpty()) {
return null; // No more backtracking avaible: No solution exists => end of this task
}
// Backtrack: Continue with cell saved at latest branching point:
PointAndDirection pd = backtrackStack.pop();
current = pd.getPoint();
Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
// DEBUG System.out.println("Backtracking to " + branchingPoint);
// Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
// DEBUG System.out.println(" Going back before " + pathSoFar.peekLast());
pathFromLocalStart.removeLast();
}
// continue while loop with newly popped current
} // while (current ...
if (!current.equals(end)) {
// this task was interrupted by another one that already found the solution
// and should end now therefore:
return null;
} else {
// Found the solution path:
pathFromLocalStart.addLast(current);
return pathFromLocalStart;
}
} // compute()
} // class SolverTask
@SuppressWarnings("serial")
public class ParallelMaze {
// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;
/**
* Atomically marks this point as visited unless visited before
* @return whether the point was visited for the first time, i.e. whether it could be marked
*/
boolean visit(Point p) {
return visited[p.getX()][p.getY()].compareAndSet(false, true);
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
// Start initial task
long startTime = System.currentTimeMillis();
// since SolverTask.compute() expects its starting point already visited,
// must do that explicitly for the global starting point:
maze.visit(maze.start);
maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
// One solution is enough: Stop all tasks that are still running
pool.shutdownNow();
pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
long endTime = System.currentTimeMillis();
System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " +
width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
Run Code Online (Sandbox Code Playgroud)
elu*_*ode 16
有关stackoverflow的相关问题:
ForkJoinPool在invokeAll/join期间停止
我制作了一个可运行的精简版本(正在使用的jvm参数:-Xms256m -Xmx1024m -Xss8m):
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
public class Test1 {
private static ForkJoinPool pool = new ForkJoinPool(2);
private static class SomeAction extends RecursiveAction {
private int counter; //recursive counter
private int childrenCount=80;//amount of children to spawn
private int idx; // just for displaying
private SomeAction(int counter, int idx) {
this.counter = counter;
this.idx = idx;
}
@Override
protected void compute() {
System.out.println(
"counter=" + counter + "." + idx +
" activeThreads=" + pool.getActiveThreadCount() +
" runningThreads=" + pool.getRunningThreadCount() +
" poolSize=" + pool.getPoolSize() +
" queuedTasks=" + pool.getQueuedTaskCount() +
" queuedSubmissions=" + pool.getQueuedSubmissionCount() +
" parallelism=" + pool.getParallelism() +
" stealCount=" + pool.getStealCount());
if (counter <= 0) return;
List<SomeAction> list = new ArrayList<>(childrenCount);
for (int i=0;i<childrenCount;i++){
SomeAction next = new SomeAction(counter-1,i);
list.add(next);
next.fork();
}
for (SomeAction action:list){
action.join();
}
}
}
public static void main(String[] args) throws Exception{
pool.invoke(new SomeAction(2,0));
}
}
Run Code Online (Sandbox Code Playgroud)
显然,当您执行连接时,当前线程会看到所需任务尚未完成,并为自己执行另一项任务.
它发生在java.util.concurrent.ForkJoinWorkerThread#joinTask.
但是,这个新任务会产生更多相同的任务,但是它们无法在池中找到线程,因为线程在连接中被锁定.而且由于它无法知道释放它们需要多长时间(线程可能处于无限循环或永远死锁),所以会产生新的线程(如路易斯·沃瑟曼所述,补偿连接的线程)):java.util.concurrent.ForkJoinPool#signalWork
因此,为了防止这种情况,您需要避免递归生成任务.
例如,如果在上面的代码中将初始参数设置为1,则活动线程数量将为2,即使您将childrenCount增加十倍.
还要注意,虽然活动线程数量增加,但运行线程数量小于或等于并行度.
Lou*_*man 10
来源评论:
补偿:除非已经有足够的活动线程,否则方法tryPreBlock()可以创建或重新激活备用线程,以补偿阻塞的加入者,直到它们解除阻塞.
我认为正在发生的事情是你没有很快完成任何任务,并且由于在提交新任务时没有可用的工作线程,因此会创建一个新线程.
严格,完全严格和最终严格的与处理有向无环图(DAG)有关.您可以谷歌这些条款,以充分了解它们.这是框架旨在处理的处理类型.查看API for Recursive ...中的代码,框架依赖于您的compute()代码来执行其他compute()链接,然后执行join().每个任务都像处理DAG一样执行单个连接().
您没有进行DAG处理.您正在分配许多新任务并在每个任务上等待(join()).阅读源代码.它非常复杂,但你可能能够弄明白.该框架没有做适当的任务管理.当它执行join()时,它将把等待的任务放在哪里?没有挂起的队列,需要监视器线程不断查看队列以查看完成的内容.这就是框架使用"延续线程"的原因.当一个任务确实加入()时,框架假设它正在等待单个较低的任务完成.当存在许多join()方法时,线程无法继续,因此需要存在辅助或继续线程.
如上所述,您需要一个分散 - 聚集类型的fork-join过程.你可以在那里分叉任务
小智 5
Holger Peine和elusive-code发布的两个代码片段实际上并不遵循1.8 版本的 javadoc中出现的推荐实践:
在最典型的用法中,fork-join 对的作用类似于并行递归函数的调用(fork)和返回(join)。与其他形式的递归调用的情况一样,返回(连接)应该先最内层执行。例如, a.fork(); b.fork(); b.加入(); a.join(); 可能比在代码b之前加入代码a更有效。
在这两种情况下,FJPool 都是通过默认构造函数实例化的。这会导致使用asyncMode=false构建池,这是默认值:
@param asyncMode 如果为 true,
则为从未加入的分叉任务建立本地先进先出调度模式。在工作线程仅处理事件式异步任务的应用程序中,此模式可能比默认的基于本地堆栈的模式更合适。对于默认值,请使用 false。
这样工作队列实际上是 lifo:
head -> | t4 | t3 | t2 | t1 | ... | <- 尾巴
因此,在片段中,它们fork()所有任务将它们推送到堆栈上 ,然后以相同的顺序join(),即从最深的任务 (t1) 到最上面的任务 (t4) 有效地阻塞,直到其他线程窃取 (t1),然后 (t2) ) 等等。由于有足够的任务来阻止所有池线程 (task_count >> pool.getParallelism()),因此补偿将按照Louis Wasserman 的描述进行。
| 归档时间: |
|
| 查看次数: |
16706 次 |
| 最近记录: |