Luc*_*yen 2 c parallel-processing openmp
我正在尝试使用 OpenMP 并行化以下 C++ 代码:
int np = 1000000;
double kk = 1 / pow(2 * pi, 2);
for (int kes = 1; kes <= 100; kes++) {
double E1 = 0;
#pragma omp parallel for reduction(+: E1)
for (int ies = 0; ies < np; ies++) {
for (int jes = 0; jes < np; jes++) {
if (ies != jes) {
float distanes = sqrt(pow(xp[ies] - xp[jes], 2) + pow(yp[ies] - yp[jes], 2) + pow(zp[ies] - zp[jes], 2));
float distan = kes * distanes;
if (distan <= 5) {
float gpspec = gpx[ies] * gpx[jes] + gpy[ies] * gpy[jes] + gpz[ies] * gpz[jes];
E1 = E1 + kk * gpspec * sin(kes * distanes) / (kes * distanes);
}
}
}
}
Ees[kes] = exp(-kes * pow(sp, 2) / 2) * E1;
}
Run Code Online (Sandbox Code Playgroud)
该代码是并行化的。然而,计算时间仍然很可怕。如何通过 n^2 运算加快计算速度?xp、yp、zp、gpx、gpy、gpz 是一维向量。
这个问题没有真正的答案,但我想提炼一下评论中讨论的一些更重要的优化。让我们只关注内部循环。
首先,您需要避免过多的乘法和函数调用。还有一些技巧不能保证编译器能够优化。例如,我们直观地知道pow(x, 2)只是对一个值进行平方,但如果您的编译器没有对此进行优化,那么它的效率比简单的要低得多x * x。
此外,还发现 O(N 2 ) 循环实际上可以减少到 O(N 2 /2),因为距离是对称的。pow如果您调用诸如和 之类的昂贵的东西,那么这是一件大事sqrt。您只需将最终结果缩放E12 即可补偿计算次数减半。
关于 的主题sqrt,还确定您不需要在距离测试之前这样做。之后再做,因为测试sqrt(d) < 5与 相同d < 25。
让我们更进一步,超越评论。请注意,该< 5测试实际上依赖于涉及 的乘法kes。如果您预先计算了一个也包含缩放的距离平方值kes,那么您的乘法就会更少。
您还可以kk从E1计算中删除该值。这不需要循环发生…… 可能吧。我的意思是,在所有这些计算中可能会出现浮点错误。所以每次你改变一些东西,你的最终结果可能会略有不同。无论如何我都会这么做。
那么……介绍完之后,我们就出发吧!
for (kes = 1; kes <= 100; kes++)
{
double E1 = 0;
const float distance_sq_thresh = 25.0f / kes / kes;
#pragma omp parallel for reduction(+: E1)
for (ies = 0; ies < np; ies++)
{
for (jes = ies+1; jes < np; jes++)
{
float dx = xp[ies] - xp[jes];
float dy = yp[ies] - yp[jes];
float dz = zp[ies] - zp[jes];
#if 0
// From Tanveer Badar's suggestion, if distances are generally large.
// This may be detrimental for large values of kes.
if (abs(dx) > distance_sq_thresh ||
abs(dy) > distance_sq_thresh ||
abs(dz) > distance_sq_thresh)
{
continue;
}
#endif
float distance_sq = dx * dx + dy * dy + dz * dz;
if (distance_sq <= distance_sq_thresh)
{
float distan = kes * sqrt(distance_sq);
float gpspec = gpx[ies] * gpx[jes] + gpy[ies] * gpy[jes] + gpz[ies] * gpz[jes];
E1 = E1 + gpspec * sin(distan) / distan;
}
}
}
Ees[kes] = exp(-kes * pow(sp, 2) / 2) * E1 * kk * 2.0f;
}
Run Code Online (Sandbox Code Playgroud)
现在,我保证这将比您现有的代码快很多。
如果您想更进一步,在每个kes循环中,您可以为可能的范围预先生成一个大的值表sin(distan) / distan,并在需要时对其进行索引。一般来说,三角运算和除法运算速度很慢。因此,如果您可以计算出可接受的误差容限并创建足够大的预计算表,这也可能是一个很好的优化。
您已经发布了一个答案,采纳了用户 dmuir 的建议,将循环kes作为内循环运行,以避免重复昂贵的计算。然而,在这个过程中,你也放弃了我在回答中阐述的一些原则。我在那里发表了评论,但让我把它们写成代码给你。
首先,预先计算平方距离阈值:
const double max_distance = 5.0;
double distance_sq_thresh[101] = {0};
for (kes = 1; kes <= 100; kes++)
{
distance_sq_thresh[kes] = max_distance * max_distance / kes / kes;
}
Run Code Online (Sandbox Code Playgroud)
现在,主要部分
const int np = 1000000;
for (ies = 0; ies < np; ies++){
for (jes = ies+1; jes < np; jes++){
double dxp = xp[ies] - xp[jes];
double dyp = yp[ies] - yp[jes];
double dzp = zp[ies] - zp[jes];
double distance_sq = dxp * dxp + dyp * dyp + dzp * dzp;
// If the first kes iteration won't pass the test, none will
if (distance_sq > distance_sq_thresh[1])
continue;
const double distance = sqrt(distance_sq);
const double gpspec = gpx[ies] * gpx[jes] + gpy[ies] * gpy[jes] + gpz[ies] * gpz[jes];
// We can keep adding distance to 'distan' instead of multiplying in a loop
double distan = 0.0;
for (kes = 1; kes <= 100; kes++){
// We know that the threshold decreases as kes increases, so break early
if (distance_sq > distance_sq_thresh[kes])
break;
E1[kes] = E1[kes] + gpspec * sin(distan) / distan;
distan += distance;
}
}
}
Run Code Online (Sandbox Code Playgroud)
最后应用结果。由于只有 100 个,因此进行这种平行处理实际上没有任何意义。
const double kk = 1.0 / pow(2.0 * pi, 2.0);
for (kes = 1; kes <= 100; kes++){
Ees[kes] = exp(-kes * pow(sp, 2) / 2) * E1[kes] * kk * 2.0f;
}
Run Code Online (Sandbox Code Playgroud)
您可以看到,使用这种方法,您不再能够并行化Ees数组计算,因为多个线程可能同时写入同一元素。也许 OpenMP 有一个指令允许某种数组缩减。我不知道。或许你可以去看看。
或者,您可以为每个 OpenMP 线程分配这些数组之一,并使用线程的序数来选择该数组。然后最后将数组添加在一起。本质上是滚动你自己的数组缩减。
E或者,您甚至可以通过在最外层循环内部定义一个本地数组(知道该数组对于当前 OpenMP 工作线程来说是本地的)来进行贫民区。然后,在jes循环之后,您可以获取主数组上的锁Ees并根据这些本地值更新它。由于jes循环执行 100 万次昂贵的计算,而锁定和更新仅执行 100 次加法,因此我预计大多数时间不会有线程被锁阻塞。