had*_*sed 5 java concurrency multithreading java.util.concurrent threadpool
我有一些可以轻松并行化的工作,并且我想使用 Java 线程在我的四核机器上分配工作。这是一种应用于旅行商问题的遗传算法。听起来不容易并行,但第一个循环很容易并行。我谈论实际演变的第二部分可能会也可能不会,但我想知道我是否因为实现线程的方式而变慢,或者是否因为算法本身而变慢。
另外,如果有人对我应该如何实现我想做的事情有更好的想法,我将非常感激。
在 main() 中,我有这个:
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(numThreads*numIter);
ThreadPoolExecutor tpool = new ThreadPoolExecutor(numThreads, numThreads, 10, TimeUnit.SECONDS, queue);
barrier = new CyclicBarrier(numThreads);
k.init(tpool);
Run Code Online (Sandbox Code Playgroud)
我有一个在 init() 内部完成的循环,如下所示:
for (int i = 0; i < numCities; i++) {
x[i] = rand.nextInt(width);
y[i] = rand.nextInt(height);
}
Run Code Online (Sandbox Code Playgroud)
我改成这样:
int errorCities = 0, stepCities = 0;
stepCities = numCities/numThreads;
errorCities = numCities - stepCities*numThreads;
// Split up work, assign to threads
for (int i = 1; i <= numThreads; i++) {
int startCities = (i-1)*stepCities;
int endCities = startCities + stepCities;
// This is a bit messy...
if(i <= numThreads) endCities += errorCities;
tpool.execute(new citySetupThread(startCities, endCities));
}
Run Code Online (Sandbox Code Playgroud)
这是 citySetupThread() 类:
public class citySetupThread implements Runnable {
int start, end;
public citySetupThread(int s, int e) {
start = s;
end = e;
}
public void run() {
for (int j = start; j < end; j++) {
x[j] = ThreadLocalRandom.current().nextInt(0, width);
y[j] = ThreadLocalRandom.current().nextInt(0, height);
}
try {
barrier.await();
} catch (InterruptedException ie) {
return;
} catch (BrokenBarrierException bbe) {
return;
}
}
}
Run Code Online (Sandbox Code Playgroud)
上面的代码在程序中运行一次,因此它是我的线程构造的测试用例(这是我第一次使用 Java 线程)。我在真实的关键部分实现了同样的事情,特别是遗传算法的进化部分,其类如下:
public class evolveThread implements Runnable {
int start, end;
public evolveThread(int s, int e) {
start = s;
end = e;
}
public void run() {
// Get midpoint
int n = population.length/2, m;
for (m = start; m > end; m--) {
int i, j;
i = ThreadLocalRandom.current().nextInt(0, n);
do {
j = ThreadLocalRandom.current().nextInt(0, n);
} while(i == j);
population[m].crossover(population[i], population[j]);
population[m].mutate(numCities);
}
try {
barrier.await();
} catch (InterruptedException ie) {
return;
} catch (BrokenBarrierException bbe) {
return;
}
}
}
Run Code Online (Sandbox Code Playgroud)
它存在于在 init() 中调用的函数 evolution() 中,如下所示:
for (int p = 0; p < numIter; p++) evolve(p, tpool);
Run Code Online (Sandbox Code Playgroud)
是的,我知道这不是很好的设计,但由于其他原因我坚持使用它。进化内部是相关部分,如下所示:
// Threaded inner loop
int startEvolve = popSize - 1,
endEvolve = (popSize - 1) - (popSize - 1)/numThreads;
// Split up work, assign to threads
for (int i = 0; i < numThreads; i++) {
endEvolve = (popSize - 1) - (popSize - 1)*(i + 1)/numThreads + 1;
tpool.execute(new evolveThread(startEvolve, endEvolve));
startEvolve = endEvolve;
}
// Wait for our comrades
try {
barrier.await();
} catch (InterruptedException ie) {
return;
} catch (BrokenBarrierException bbe) {
return;
}
population[1].crossover(population[0], population[1]);
population[1].mutate(numCities);
population[0].mutate(numCities);
// Pick out the strongest
Arrays.sort(population, population[0]);
current = population[0];
generation++;
Run Code Online (Sandbox Code Playgroud)
我真正想知道的是:
“队列”有什么作用?我为池中所有线程执行的作业创建一个队列是否正确?如果大小不够大,我会收到 RejectedExecutionException。我只是决定执行 numThreads*numIterations 因为这就是会有多少个工作(对于我之前提到的实际进化方法)。但这很奇怪..如果barrier.await()正在工作,我不应该这样做,这导致我......
我是否正确使用了barrier.await()?目前我把它放在两个地方:Runnable 对象的 run() 方法内部,以及执行所有作业的 for 循环之后。我本以为只需要一个,但如果删除其中一个,就会出现错误。
我对线程的争用表示怀疑,因为这是我可以从荒谬的减速中收集到的唯一信息(它确实随输入参数而扩展)。我想知道这是否与我实现线程池和屏障的方式有关。如果没有,那么我想我将不得不查看 crossover() 和 mutate() 方法的内部。
首先,我认为您可能对 CyclicBarrier 的使用方式有疑问。目前,您正在使用执行程序线程数作为参与方数来初始化它。然而,你还有另外的一方;主线程。所以我认为你需要这样做:
barrier = new CyclicBarrier(numThreads + 1);
Run Code Online (Sandbox Code Playgroud)
我认为这应该可行,但我个人认为这是对屏障的奇怪使用。
当使用工作队列线程池模型时,我发现使用信号量或 Java 的 Future 模型更容易。
对于信号量:
class MyRunnable implements Runnable {
private final Semaphore sem;
public MyRunnable(Semaphore sem) {
this.sem = sem;
}
public void run() {
// do work
// signal complete
sem.release()
}
}
Run Code Online (Sandbox Code Playgroud)
然后在你的主线程中:
Semaphore sem = new Semaphore(0);
for (int i = 0; i < numJobs; ++i) {
threadPool.execute(new MyRunnable(sem));
}
sem.acquire(numJobs);
Run Code Online (Sandbox Code Playgroud)
它实际上与屏障做同样的事情,但我发现更容易考虑工作任务“发出信号”它们已完成,而不是再次与主线程“同步”。
例如,如果您查看CyclicBarrier JavaDoc中的示例代码,则调用barrier.await()位于工作程序内部的循环内。因此,它实际上是在同步多个长时间运行的工作线程,并且主线程不参与屏障。barrier.await()在循环外的工作线程末尾进行调用更多的是发出完成信号。
| 归档时间: |
|
| 查看次数: |
2862 次 |
| 最近记录: |