dab*_*aba 6 java concurrency multithreading threadpool cyclicbarrier
我正在运行一些并行处理的测试,并制作了一个程序,给出一个整数矩阵,根据邻居重新计算每个位置的值.
我需要一个矩阵的副本,因此CyclicBarrier一旦部分问题得到解决,值将不会被覆盖并用于合并结果:
CyclicBarrier cyclic_barrier = new CyclicBarrier(n_tasks + 1, new Runnable() {
public void run() {
ParallelProcess.mergeResult();
}
});
ParallelProcess p = new ParallelProcess(cyclic_barrier, n_rows, r_cols); // init
Run Code Online (Sandbox Code Playgroud)
每个任务都分配了矩阵的一部分:我将它按行等分.但是可能会发生这样的划分并不精确,因此会有一小部分对应于不会提交给线程池的持续行.
示例:如果我有16行且n_tasks = 4没有问题,则所有4个都将提交到池中.但如果我有18,那么前16个将被提交,但不是最后两个.
如果发生这种情况,我会强制提交.好吧,我实际上并没有提交,因为我使用的是我创建的固定线程池ExecutorService e = Executors.newFixedThreadPool(n_tasks).由于池中的所有插槽都被占用,并且线程被屏障阻塞(mybarrier.await()在run方法中调用),我无法将其提交到池中,所以我只是使用了Thread.start().
让我们谈谈这一点.由于我需要考虑CyclicBarrier剩余块的可能性,因此当事方的数量必须增加1.
但如果这种情况没有发生,我将成为一方短路触发障碍.
我的解决方案是什么?:
if (lower_limit != n_rows) { // the remaining chunk to be processed
Thread t = new Thread(new ParallelProcess(lower_limit, n_rows));
t.start();
t.join();
}
else {
cyclic_barrier.await();
}
Run Code Online (Sandbox Code Playgroud)
我觉得我在欺骗时使用这个cyclic_barrier.await()技巧来强行提升障碍.
有没有其他方法可以解决这个问题,所以我不必做我正在做的事情?
虽然这不能回答您有关 CyclicBarriers 的问题,但我可以建议使用 Phaser吗?这确实能够包含参与方的数量,并且还允许您mergeResult在某个阶段跳闸时运行。
因此,在执行异步计算之前,只需register. 然后在该计算中让线程到达phaser. 当所有线程都到达时,它将推进该阶段并可以调用重写方法onAdvance。
提交内容:
ParallelProcess process = new ParallelProcess(lower_limit, n_rows));
phaser.register();
executor.submit(process);
Run Code Online (Sandbox Code Playgroud)
处理器
public void run(){
//do stuff
phaser.arrive();
}
Run Code Online (Sandbox Code Playgroud)
移相器
Phaser phaser = new Phaser(){
protected boolean onAdvance(int phase, int registeredParties) {
ParallelProcess.mergeResult();
return true;
}
}
Run Code Online (Sandbox Code Playgroud)