MPI:取消非阻塞发送

kun*_*ami 1 c parallel-processing mpi

我正在使用Open MPI库来实现以下算法:我们有两个进程p1p2.他们都在执行一些迭代,并且在每次迭代结束时,他们都会传达他们的结果.问题是执行不一定是平衡的,所以p1可以执行10次迭代p2执行1.尽管如此,我想p2读取上次执行的迭代的最新结果p1.

因此,我的想法是p1在每次迭代时发送其结果.但是,在从迭代发送结果之前i,它应检查是否p2实际从迭代中读取信息i-1.如果没有,它应该取消之前的发送,这样当p2读取时p1,它将读取最新的结果.

不幸的是,我不知道该怎么做.我尝试过使用MPI_Cancel,如下面的代码所示:

int main (int argc, char *argv[]){

    int myrank, numprocs;
    MPI_Status status;
    MPI_Request request;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);

    if(myrank == 0){
        int send_buf = 1, flag;
        MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, 
                  &request);
        MPI_Cancel(&request);
        MPI_Wait(&request, &status);
        MPI_Test_cancelled(&status, &flag);
        if (flag) printf("Send cancelled\n");
        else printf("Send NOT cancelled\n");
        send_buf = 2;
        MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, 
                  &request);
    }
    else {
        sleep(5);
        int msg;
        MPI_Recv(&msg, 1, MPI_INT, 0, 123,
                 MPI_COMM_WORLD, &status);
        printf("%d\n", msg);
    }
    MPI_Finalize();

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

但是当我执行时,它表示发送无法取消并p2打印1而不是2.

我想知道是否有任何的方式来实现什么,我建议或是否有替代代码之间的行为p1p2.

sus*_*att 5

我会扭转对通信的控制.不应p1发送必须取消的不必要的消息,而p2应发出信号表明它已准备好接收消息,并且p1仅在此时发送.在此期间,p1只需用最新结果覆盖其发送缓冲区.

在(未经测试的)代码中:

if ( rank == 0 )
{
    int ready;
    MPI_Request p2_request;
    MPI_Status p2_status;
    // initial request
    MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request);
    for (int i=0; true; i++)
    {
        sleep(1);
        MPI_Test(&p2_request, &ready, &p2_status);
        if ( ready )
        {
            // blocking send: p2 is ready to receive
            MPI_Send(&i, 1, MPI_INT, 1, 123, MPI_COMM_WORLD);
            // post new request
            MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request);
        }
    }
}
else
{
    int msg;
    MPI_Status status;
    while (true)
    {
        sleep(5);
        // actual message content doesn't matter, just let p1 know we're ready
        MPI_Send(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD);
        // receive message
        MPI_Recv(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD, &status);
    }
}
Run Code Online (Sandbox Code Playgroud)

就像我说的那样,这是未经测试的代码,但你可能会看到我在那里得到的东西.MPI_Cancel只有在出现可怕的错误时才应该使用:在正常执行期间不应取消任何消息.


Jon*_*rsi 5

另一种方法完全是使用MPI单向通信(例如,http://www.linux-mag.com/id/1793).但请注意,执行被动通信,这是你真正想要的,这是相当棘手的(虽然成对,mpi_win_post和mpi_win_start更容易)并且单方面的东西有望在MPI-3中全部改变,所以我不喜欢我知道我建议你走多远的路.

与你在这里首次尝试的内容更直接相关,而不是取消消息(如上所述,这非常激烈),通过所有排队的消息可能要容易得多(MPI保证消息不会相互超越 - 唯一需要注意的是,如果您正在使用MPI_THREAD_MULTIPLE并且在一个MPI任务中发送多个线程,则在这种情况下可以定义顺序):

#include <stdio.h>
#include <mpi.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>

void compute() {
    const int maxusecs=500;
    unsigned long sleepytime=(unsigned long)round(((float)rand()/RAND_MAX)*maxusecs);

    usleep(sleepytime);
}

int main(int argc, char** argv)
{
  int rank, size, i;
  int otherrank;
  const int niters=10;
  const int tag=5;
  double newval;
  double sentvals[niters+1];
  double othernewval;
  MPI_Request reqs[niters+1];
  MPI_Status stat;
  int ready;

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
  if (size != 2) {
     fprintf(stderr,"This assumes 2 processes\n");
     MPI_Finalize();
     exit(-1);
  }

  otherrank = (rank == 0 ? 1 : 0);
  srand(rank);

  compute();
  newval = rank * 100. + 0;
  sentvals[0] = newval;
  MPI_Isend(&(sentvals[0]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0]));
  MPI_Recv (&othernewval,   1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat);
  for (i=0; i<niters; i++) {

      MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat);
      while (ready) {
          MPI_Recv(&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat);
          printf("%s[%d]: Reading queued data %lf:\n",
                  (rank == 0 ? "" : "\t\t\t\t"), rank, othernewval);
          MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat);
      }

      printf("%s[%d]: Got data %lf, computing:\n", 
              (rank == 0 ? "" : "\t\t\t\t"), rank, othernewval);
      compute();

      /* update my data */ 
      newval = rank * 100. + i + 1;
      printf("%s[%d]: computed %lf, sending:\n", 
              (rank == 0 ? "" : "\t\t\t\t"), rank, newval);
      sentvals[i+1] = newval;
      MPI_Isend(&(sentvals[i+1]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0]));
   }


  MPI_Finalize();

  return 0;
}
Run Code Online (Sandbox Code Playgroud)

运行它给你(注意,因为发送数据并不意味着它在打印时收到):

[0]: Got data 100.000000, computing:
                                [1]: Got data 0.000000, computing:
[0]: computed 1.000000, sending:
[0]: Got data 100.000000, computing:
                                [1]: computed 101.000000, sending:
                                [1]: Got data 0.000000, computing:
[0]: computed 2.000000, sending:
[0]: Got data 100.000000, computing:
                                [1]: computed 102.000000, sending:
                                [1]: Reading queued data 1.000000:
                                [1]: Got data 1.000000, computing:
[0]: computed 3.000000, sending:
[0]: Reading queued data 101.000000:
[0]: Got data 101.000000, computing:
                                [1]: computed 103.000000, sending:
                                [1]: Reading queued data 2.000000:
                                [1]: Got data 2.000000, computing:
[0]: computed 4.000000, sending:
                                [1]: computed 104.000000, sending:
[0]: Reading queued data 102.000000:
                                [1]: Reading queued data 3.000000:
                                [1]: Got data 3.000000, computing:
[0]: Got data 102.000000, computing:
[0]: computed 5.000000, sending:
[0]: Reading queued data 103.000000:
[0]: Got data 103.000000, computing:
                                [1]: computed 105.000000, sending:
                                [1]: Reading queued data 4.000000:
                                [1]: Got data 4.000000, computing:
[0]: computed 6.000000, sending:
[0]: Reading queued data 104.000000:
[0]: Got data 104.000000, computing:
                                [1]: computed 106.000000, sending:
                                [1]: Reading queued data 5.000000:
                                [1]: Got data 5.000000, computing:
[0]: computed 7.000000, sending:
[0]: Reading queued data 105.000000:
[0]: Got data 105.000000, computing:
                                [1]: computed 107.000000, sending:
                                [1]: Reading queued data 6.000000:
                                [1]: Got data 6.000000, computing:
[0]: computed 8.000000, sending:
[0]: Reading queued data 106.000000:
[0]: Got data 106.000000, computing:
                                [1]: computed 108.000000, sending:
                                [1]: Reading queued data 7.000000:
                                [1]: Got data 7.000000, computing:
[0]: computed 9.000000, sending:
[0]: Reading queued data 107.000000:
[0]: Got data 107.000000, computing:
                                [1]: computed 109.000000, sending:
                                [1]: Reading queued data 8.000000:
                                [1]: Got data 8.000000, computing:
[0]: computed 10.000000, sending:
                                [1]: computed 110.000000, sending:
Run Code Online (Sandbox Code Playgroud)

请注意,这只是演示代码,最终版本确实需要在那里进行waitalls和更多iprobes以释放任何待处理请求并刷新任何等待消息.