alf*_*lfC 6 c asynchronous callback mpi request
在MPI中,可以运行异步消息传递例程(例如,receive,with MPI_Irecv
).是否可以在请求完成后附加一个回调函数来执行?例如,处理收到的数据.
这是我正在寻找的一个例子:
#include "mpi.h"
#include <stdio.h>
void mycallback(void* data){
(int*)data += 1; // add one to the received data
}
int main(int argc, char *argv[]){
int myid, numprocs, left, right;
int buffer[10], buffer2[10];
MPI_Request request;
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
right = (myid + 1) % numprocs;
left = myid - 1;
if (left < 0)
left = numprocs - 1;
MPI_Irecv(buffer, 10, MPI_INT, left, 123, MPI_COMM_WORLD, &request);
// Attach_Callback(request, &mycallback); //somewhere after this point recv is completed an f is executed
MPI_Send(buffer2, 10, MPI_INT, right, 123, MPI_COMM_WORLD);
MPI_Wait(&request, &status); //the recv and the callback must have been called at this point
MPI_Finalize();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
我发现有一个MPI_Grequest_start
和MPI_Grequest_complete
函数,但它们似乎是为了其他东西,因为创建的请求与特定的消息传递无关.
也许我必须实现一个Grequest(通用请求),其中回调包含在MPI_Recv
(not MPI_Irecv
)中.这是个主意吗?
标准中没有这样的东西。
正如@AhmedMasud所说,您可以找到一种使用通用请求的方法: http://mpi-forum.org/docs/mpi-3.1/mpi31-report/node297.htm#Node297
正如您可以在其中读到的,由于我同意的某些原因(MPI 线程和程序线程之间的工作划分),该标准可能永远不会包含 Irecv 的回调。
您尝试做的事情并不是微不足道的,并且与很多可移植性问题相关。您应该问自己的问题:在 MPI 线程中执行回调真的会带来任何好处吗?我怀疑你心目中的这个好处是效率,但出于效率原因,应该避免 Irecv 和 Isend,非阻塞是一个很好的功能,但只有在没有其他选择的情况下才应该使用(例如输出服务器,其中您绝对不想浪费计算客户端的时间(但即使在这种情况下,缓冲发送通常更好,并导致您获得更大的带宽和更小的延迟))。
您需要的通信的真实结构是什么?如果是 0->1, 1->2 ... n-1->n, n->0 这段代码运行良好(并且比您的解决方案更快),那么您可以用您最喜欢的方式轻松定义回调这样做(解决问题的时间会少得多,调试变得更加容易:-)):
template<class Type>
void Parallel::sendUp(Type& bufferSend,
Type& bufferRec,
long len)
{
if(this->rank()%2==0)
{
if(this->rank()!=this->size()-1)
{
this->send(bufferSend,len,this->rank());
}
if(this->rank()!= 0)
{
this->receive(bufferRec,len,this->rank()-1);
}
else if(this->size()%2==0)
{
this->receive(bufferRec,len,this->size()-1);
}
}
else
{
this->receive( bufferRec, len , this->rank()-1);
if(this->grid_rank()!=this->grid_size()-1)
{
this->send(bufferSend,len,this->rank()+1);
}
else
{
this->send( bufferSend, len , 0);
}
}
if(this->size()%2!=0)
{
if(this->rank()==this->size()-1)
{
this->send( bufferSend, len , 0);
}
if(this->grid()==0)
{
this->receive(bufferRec, len , this->size()-1);
}
}
}
Run Code Online (Sandbox Code Playgroud)
在该代码中,并行对象“只是”一些 MPI 调用的包装器,只是为了简化调用:
parallel.rank() = rank in the comm
parallel.size() = size of the comm
parallel.send/rec() is defined as follow
template<class Type>
void Parallel::send(Type* array, int len, int to)
{
MPI_Send(array, len*sizeof(Type), MPI_BYTE, to, 0,comm_);
}
template<class Type>
void Parallel::rec(Type* array, int len, int to)
{
MPI_Send(array, len*sizeof(Type), MPI_BYTE, to, 0,comm_);
}
template<class Type>
MPI_Status Parallel2d::receive(Type& array, int from, int len)
{
MPI_Status status;
MPI_Recv( &array, len*sizeof(Type), MPI_BYTE, from, 0,comm_,&status);
return status;
}
Run Code Online (Sandbox Code Playgroud)
希望能帮助到你。