Kov*_*nen 8 java parallel-processing concurrency synchronization java.util.concurrent
我有一个类似这样的程序
public class Test implements Runnable
{
public int local_counter
public static int global_counter
// Barrier waits for as many threads as we launch + main thread
public static CyclicBarrier thread_barrier = new CyclicBarrier (n_threads + 1);
/* Constructors etc. */
public void run()
{
for (int i=0; i<100; i++)
{
thread_barrier.await();
local_counter = 0;
for(int j=0 ; j = 20 ; j++)
local_counter++;
thread_barrier.await();
}
}
public void main()
{
/* Create and launch some threads, stored on thread_array */
for(int i=0 ; i<100 ; i++)
{
thread_barrier.await();
thread_barrier.await();
for (int t=1; t<thread_array.length; t++)
{
global_counter += thread_array[t].local_counter;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
基本上,我有几个线程与他们自己的本地计数器,我正在这样做(在循环中)
|----| | |----|
|main| | |pool|
|----| | |----|
|
-------------------------------------------------------
barrier (get local counters before they're overwritten)
-------------------------------------------------------
|
| 1. reset local counter
| 2. do some computations
| involving local counter
|
-------------------------------------------------------
barrier (synchronize all threads)
-------------------------------------------------------
|
1. update global counter |
using each thread's |
local counter |
Run Code Online (Sandbox Code Playgroud)
这一切都应该是好的和花花公子,但事实证明这不能很好地扩展.在16个物理节点集群上,6-8个线程之后的加速可以忽略不计,所以我必须摆脱其中一个等待.我已经尝试过可以扩展的CyclicBarrier,能够做到这一点的Semaphores,以及一个自定义库(jbarrier),它可以运行得很好,直到有比物理内核更多的线程,此时它的性能比顺序版本差.但我无法想出一种方法来做到这一点,而不是两次停止所有线程.
编辑:虽然我很欣赏你可能有关于我的程序中任何其他可能的瓶颈的所有和任何见解,我正在寻找关于这个特定问题的答案.如果需要,我可以提供更具体的示例
一些修复:假设您的线程数组 [0] 应该参与全局计数器总和,您对线程的迭代应该是 for(int t=0;...) 。我们可以猜测这是一个 Test 数组,而不是线程。local_counter 应该是易失性的,否则您可能看不到测试线程和主线程的真实值。
好吧,现在,你已经有了一个正确的两阶段循环,afaict。任何其他东西,比如移相器或在每个循环都有一个新的倒计时锁存器的 1 个循环屏障,都只是同一主题的变体:让大量线程同意让主线程恢复,并让主线程一次恢复多个线程。
更精简的实现可能涉及可重入锁、到达的测试线程的计数器、恢复所有测试线程上的测试的条件以及恢复主线程的条件。当 --count==0 时到达的测试线程应该发出主恢复条件信号。所有测试线程都等待测试恢复条件。main 应该将计数器重置为 N 并在测试恢复条件下发送 signalAll,然后等待 main 条件。线程(测试线程和主线程)每个循环仅等待一次。
最后,如果最终目标是任何线程更新的总和,您应该查看 LongAdder(如果不是 AtomicLong)来同时执行 long 加法,而不必停止所有线程(它们相互竞争并相加,不涉及主线程)。
否则,您可以让线程将其材料传递到主线程读取的阻塞队列。这样做的方式有太多;我很难理解为什么你挂起所有线程来收集数据。仅此而已。问题过于简单化,我们没有足够的约束来证明您所做的事情是合理的。
不用担心 CyclicBarrier,它是通过可重入锁、计数器和将 signalAll() 触发到所有等待线程的条件来实现的。这是严格编码的,事实上。如果您想要无锁版本,您将面临太多繁忙的自旋循环,浪费 CPU 时间,特别是当您担心线程多于内核时的扩展时。
同时,您是否有可能实际上有 8 个核超线程,看起来像 16 个 cpu?
清理后,您的代码如下所示:
package tests;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Stream;
public class Test implements Runnable {
static final int n_threads = 8;
static final long LOOPS = 10000;
public static int global_counter;
public static CyclicBarrier thread_barrier = new CyclicBarrier(n_threads + 1);
public volatile int local_counter;
@Override
public void run() {
try {
runImpl();
} catch (InterruptedException | BrokenBarrierException e) {
//
}
}
void runImpl() throws InterruptedException, BrokenBarrierException {
for (int i = 0; i < LOOPS; i++) {
thread_barrier.await();
local_counter = 0;
for (int j=0; j<20; j++)
local_counter++;
thread_barrier.await();
}
}
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
Test[] ra = new Test[n_threads];
Thread[] ta = new Thread[n_threads];
for(int i=0; i<n_threads; i++)
(ta[i] = new Thread(ra[i]=new Test()).start();
long nanos = System.nanoTime();
for (int i = 0; i < LOOPS; i++) {
thread_barrier.await();
thread_barrier.await();
for (int t=0; t<ra.length; t++) {
global_counter += ra[t].local_counter;
}
}
System.out.println(global_counter+", "+1e-6*(System.nanoTime()-nanos)+" ms");
Stream.of(ta).forEach(t -> t.interrupt());
}
}
Run Code Online (Sandbox Code Playgroud)
我的 1 锁版本如下所示:
package tests;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
public class TwoPhaseCycle implements Runnable {
static final boolean DEBUG = false;
static final int N = 8;
static final int LOOPS = 10000;
static ReentrantLock lock = new ReentrantLock();
static Condition testResume = lock.newCondition();
static volatile long cycle = -1;
static Condition mainResume = lock.newCondition();
static volatile int testLeft = 0;
static void p(Object msg) {
System.out.println(Thread.currentThread().getName()+"] "+msg);
}
//-----
volatile int local_counter;
@Override
public void run() {
try {
runImpl();
} catch (InterruptedException e) {
p("interrupted; ending.");
}
}
public void runImpl() throws InterruptedException {
lock.lock();
try {
if(DEBUG) p("waiting for 1st testResumed");
while(cycle<0) {
testResume.await();
}
} finally {
lock.unlock();
}
long localCycle = 0;//for (int i = 0; i < LOOPS; i++) {
while(true) {
if(DEBUG) p("working");
local_counter = 0;
for (int j = 0; j<20; j++)
local_counter++;
localCycle++;
lock.lock();
try {
if(DEBUG) p("done");
if(--testLeft <=0)
mainResume.signalAll(); //could have been just .signal() since only main is waiting, but safety first.
if(DEBUG) p("waiting for cycle "+localCycle+" testResumed");
while(cycle < localCycle) {
testResume.await();
}
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
TwoPhaseCycle[] ra = new TwoPhaseCycle[N];
Thread[] ta = new Thread[N];
for(int i=0; i<N; i++)
(ta[i] = new Thread(ra[i]=new TwoPhaseCycle(), "\t\t\t\t\t\t\t\t".substring(0, i%8)+"\tT"+i)).start();
long nanos = System.nanoTime();
int global_counter = 0;
for (int i=0; i<LOOPS; i++) {
lock.lock();
try {
if(DEBUG) p("gathering");
for (int t=0; t<ra.length; t++) {
global_counter += ra[t].local_counter;
}
testLeft = N;
cycle = i;
if(DEBUG) p("resuming cycle "+cycle+" tests");
testResume.signalAll();
if(DEBUG) p("waiting for main resume");
while(testLeft>0) {
mainResume.await();
}
} finally {
lock.unlock();
}
}
System.out.println(global_counter+", "+1e-6*(System.nanoTime()-nanos)+" ms");
p(global_counter);
Stream.of(ta).forEach(t -> t.interrupt());
}
}
Run Code Online (Sandbox Code Playgroud)
当然,这绝不是一个稳定的微基准,但趋势表明它更快。希望你喜欢。(我放弃了一些最喜欢的调试技巧,值得将调试变为真......)
| 归档时间: |
|
| 查看次数: |
359 次 |
| 最近记录: |