kun*_*ami 1 c parallel-processing mpi
我正在使用Open MPI库来实现以下算法:我们有两个进程p1和p2.他们都在执行一些迭代,并且在每次迭代结束时,他们都会传达他们的结果.问题是执行不一定是平衡的,所以p1可以执行10次迭代p2执行1.尽管如此,我想p2读取上次执行的迭代的最新结果p1.
因此,我的想法是p1在每次迭代时发送其结果.但是,在从迭代发送结果之前i,它应检查是否p2实际从迭代中读取信息i-1.如果没有,它应该取消之前的发送,这样当p2读取时p1,它将读取最新的结果.
不幸的是,我不知道该怎么做.我尝试过使用MPI_Cancel,如下面的代码所示:
int main (int argc, char *argv[]){
int myrank, numprocs;
MPI_Status status;
MPI_Request request;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if(myrank == 0){
int send_buf = 1, flag;
MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD,
&request);
MPI_Cancel(&request);
MPI_Wait(&request, &status);
MPI_Test_cancelled(&status, &flag);
if (flag) printf("Send cancelled\n");
else printf("Send NOT cancelled\n");
send_buf = 2;
MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD,
&request);
}
else {
sleep(5);
int msg;
MPI_Recv(&msg, 1, MPI_INT, 0, 123,
MPI_COMM_WORLD, &status);
printf("%d\n", msg);
}
MPI_Finalize();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
但是当我执行时,它表示发送无法取消并p2打印1而不是2.
我想知道是否有任何的方式来实现什么,我建议或是否有替代代码之间的行为p1和p2.
我会扭转对通信的控制.不应p1发送必须取消的不必要的消息,而p2应发出信号表明它已准备好接收消息,并且p1仅在此时发送.在此期间,p1只需用最新结果覆盖其发送缓冲区.
在(未经测试的)代码中:
if ( rank == 0 )
{
int ready;
MPI_Request p2_request;
MPI_Status p2_status;
// initial request
MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request);
for (int i=0; true; i++)
{
sleep(1);
MPI_Test(&p2_request, &ready, &p2_status);
if ( ready )
{
// blocking send: p2 is ready to receive
MPI_Send(&i, 1, MPI_INT, 1, 123, MPI_COMM_WORLD);
// post new request
MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request);
}
}
}
else
{
int msg;
MPI_Status status;
while (true)
{
sleep(5);
// actual message content doesn't matter, just let p1 know we're ready
MPI_Send(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD);
// receive message
MPI_Recv(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD, &status);
}
}
Run Code Online (Sandbox Code Playgroud)
就像我说的那样,这是未经测试的代码,但你可能会看到我在那里得到的东西.MPI_Cancel只有在出现可怕的错误时才应该使用:在正常执行期间不应取消任何消息.
另一种方法完全是使用MPI单向通信(例如,http://www.linux-mag.com/id/1793).但请注意,执行被动通信,这是你真正想要的,这是相当棘手的(虽然成对,mpi_win_post和mpi_win_start更容易)并且单方面的东西有望在MPI-3中全部改变,所以我不喜欢我知道我建议你走多远的路.
与你在这里首次尝试的内容更直接相关,而不是取消消息(如上所述,这非常激烈),通过所有排队的消息可能要容易得多(MPI保证消息不会相互超越 - 唯一需要注意的是,如果您正在使用MPI_THREAD_MULTIPLE并且在一个MPI任务中发送多个线程,则在这种情况下可以定义顺序):
#include <stdio.h>
#include <mpi.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>
void compute() {
const int maxusecs=500;
unsigned long sleepytime=(unsigned long)round(((float)rand()/RAND_MAX)*maxusecs);
usleep(sleepytime);
}
int main(int argc, char** argv)
{
int rank, size, i;
int otherrank;
const int niters=10;
const int tag=5;
double newval;
double sentvals[niters+1];
double othernewval;
MPI_Request reqs[niters+1];
MPI_Status stat;
int ready;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (size != 2) {
fprintf(stderr,"This assumes 2 processes\n");
MPI_Finalize();
exit(-1);
}
otherrank = (rank == 0 ? 1 : 0);
srand(rank);
compute();
newval = rank * 100. + 0;
sentvals[0] = newval;
MPI_Isend(&(sentvals[0]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0]));
MPI_Recv (&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat);
for (i=0; i<niters; i++) {
MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat);
while (ready) {
MPI_Recv(&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat);
printf("%s[%d]: Reading queued data %lf:\n",
(rank == 0 ? "" : "\t\t\t\t"), rank, othernewval);
MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat);
}
printf("%s[%d]: Got data %lf, computing:\n",
(rank == 0 ? "" : "\t\t\t\t"), rank, othernewval);
compute();
/* update my data */
newval = rank * 100. + i + 1;
printf("%s[%d]: computed %lf, sending:\n",
(rank == 0 ? "" : "\t\t\t\t"), rank, newval);
sentvals[i+1] = newval;
MPI_Isend(&(sentvals[i+1]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0]));
}
MPI_Finalize();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
运行它给你(注意,因为发送数据并不意味着它在打印时收到):
[0]: Got data 100.000000, computing:
[1]: Got data 0.000000, computing:
[0]: computed 1.000000, sending:
[0]: Got data 100.000000, computing:
[1]: computed 101.000000, sending:
[1]: Got data 0.000000, computing:
[0]: computed 2.000000, sending:
[0]: Got data 100.000000, computing:
[1]: computed 102.000000, sending:
[1]: Reading queued data 1.000000:
[1]: Got data 1.000000, computing:
[0]: computed 3.000000, sending:
[0]: Reading queued data 101.000000:
[0]: Got data 101.000000, computing:
[1]: computed 103.000000, sending:
[1]: Reading queued data 2.000000:
[1]: Got data 2.000000, computing:
[0]: computed 4.000000, sending:
[1]: computed 104.000000, sending:
[0]: Reading queued data 102.000000:
[1]: Reading queued data 3.000000:
[1]: Got data 3.000000, computing:
[0]: Got data 102.000000, computing:
[0]: computed 5.000000, sending:
[0]: Reading queued data 103.000000:
[0]: Got data 103.000000, computing:
[1]: computed 105.000000, sending:
[1]: Reading queued data 4.000000:
[1]: Got data 4.000000, computing:
[0]: computed 6.000000, sending:
[0]: Reading queued data 104.000000:
[0]: Got data 104.000000, computing:
[1]: computed 106.000000, sending:
[1]: Reading queued data 5.000000:
[1]: Got data 5.000000, computing:
[0]: computed 7.000000, sending:
[0]: Reading queued data 105.000000:
[0]: Got data 105.000000, computing:
[1]: computed 107.000000, sending:
[1]: Reading queued data 6.000000:
[1]: Got data 6.000000, computing:
[0]: computed 8.000000, sending:
[0]: Reading queued data 106.000000:
[0]: Got data 106.000000, computing:
[1]: computed 108.000000, sending:
[1]: Reading queued data 7.000000:
[1]: Got data 7.000000, computing:
[0]: computed 9.000000, sending:
[0]: Reading queued data 107.000000:
[0]: Got data 107.000000, computing:
[1]: computed 109.000000, sending:
[1]: Reading queued data 8.000000:
[1]: Got data 8.000000, computing:
[0]: computed 10.000000, sending:
[1]: computed 110.000000, sending:
Run Code Online (Sandbox Code Playgroud)
请注意,这只是演示代码,最终版本确实需要在那里进行waitalls和更多iprobes以释放任何待处理请求并刷新任何等待消息.