Sun*_*vil 48 c parallel-processing mpi
如何将二维数组块发送到不同的处理器?假设2D阵列大小为400x400,我想将大小为100X100的块发送到不同的处理器.这个想法是每个处理器将在其单独的块上执行计算,并将其结果发送回第一个处理器以获得最终结果.
我在C程序中使用MPI.
Jon*_*rsi 137
首先让我说你通常不想这样做 - 分散并从一些"主"过程中收集大量数据.通常,您希望每个任务都能完成自己的工作,并且您的目标是永远不要让一个处理器需要整个数据的"全局视图"; 只要您需要,就可以限制可扩展性和问题大小.如果您正在为I/O执行此操作 - 一个进程读取数据,然后将其分散,然后将其收集回来进行写入,您最终将需要查看MPI-IO.
但是,为了解决您的问题,MPI有很好的方法可以将任意数据从内存中拉出来,并将其分散/收集到一组处理器中.不幸的是,这需要相当数量的MPI概念--MPI类型,范围和集体操作.在这个问题的答案中讨论了很多基本思想 - MPI_Type_create_subarray和MPI_Gather.
更新 - 在白天的冷光下,这是很多代码而不是很多解释.所以让我稍微扩展一下.
考虑一个1d整数全局数组,任务0有你想要分配给许多MPI任务,这样它们每个都得到一个本地数组.假设你有4个任务,全局数组是[01234567].您可以让任务0发送四条消息(包括一条消息)来分发它,当它需要重新组装时,接收四条消息将它们捆绑在一起; 但这显然在大量流程中非常耗时.有针对这些操作的优化例程 - 分散/收集操作.所以在这个1d案例中你会做这样的事情:
int global[8]; /* only task 0 has this */
int local[2]; /* everyone has this */
const int root = 0; /* the processor with the initial global data */
if (rank == root) {
for (int i=0; i<7; i++) global[i] = i;
}
MPI_Scatter(global, 2, MPI_INT, /* send everyone 2 ints from global */
local, 2, MPI_INT, /* each proc receives 2 ints into local */
root, MPI_COMM_WORLD); /* sending process is root, all procs in */
/* MPI_COMM_WORLD participate */
Run Code Online (Sandbox Code Playgroud)
在此之后,处理器的数据看起来像
task 0: local:[01] global: [01234567]
task 1: local:[23] global: [garbage-]
task 2: local:[45] global: [garbage-]
task 3: local:[67] global: [garbage-]
Run Code Online (Sandbox Code Playgroud)
也就是说,分散操作采用全局数组并将连续的2-int块发送到所有处理器.
要重新组装数组,我们使用的MPI_Gather()操作完全相同但反过来:
for (int i=0; i<2; i++)
local[i] = local[i] + rank;
MPI_Gather(local, 2, MPI_INT, /* everyone sends 2 ints from local */
global, 2, MPI_INT, /* root receives 2 ints each proc into global */
root, MPI_COMM_WORLD); /* recv'ing process is root, all procs in */
/* MPI_COMM_WORLD participate */
Run Code Online (Sandbox Code Playgroud)
现在数据看起来像
task 0: local:[01] global: [0134679a]
task 1: local:[34] global: [garbage-]
task 2: local:[67] global: [garbage-]
task 3: local:[9a] global: [garbage-]
Run Code Online (Sandbox Code Playgroud)
Gather带回所有数据,这里是10,因为在开始这个例子时我没有仔细考虑我的格式.
如果数据点的数量不能均匀地划分进程数,会发生什么?我们需要向每个进程发送不同数量的项目?然后,您需要一个通用版本的scatter,MPI_Scatterv()它允许您指定每个处理器的计数和位移 - 位于全局数组中的那条数据的起始位置.因此,假设您有一个[abcdefghi]包含9个字符的字符数组,并且您将为每个进程分配两个字符,除了最后一个字符,其中有三个字符.然后你需要
char global[9]; /* only task 0 has this */
char local[3]={'-','-','-'}; /* everyone has this */
int mynum; /* how many items */
const int root = 0; /* the processor with the initial global data */
if (rank == 0) {
for (int i=0; i<8; i++) global[i] = 'a'+i;
}
int counts[4] = {2,2,2,3}; /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = {0,2,4,6}; /* the starting point of everyone's data */
/* in the global array */
MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
MPI_INT,
local, mynum, MPI_INT; /* I'm receiving mynum MPI_INTs into local */
root, MPI_COMM_WORLD);
Run Code Online (Sandbox Code Playgroud)
现在数据看起来像
task 0: local:[ab-] global: [abcdefghi]
task 1: local:[cd-] global: [garbage--]
task 2: local:[ef-] global: [garbage--]
task 3: local:[ghi] global: [garbage--]
Run Code Online (Sandbox Code Playgroud)
您现在使用scatterv来分发不规则数据量.在每种情况下,位移是两个*级别(以字符为单位;位移是以散布形式发送的类型为单位的单位;或者是从数组的起点开始,它通常不以字节为单位),并且计数是{2,2,2,3}.如果它是我们希望有3个字符的第一个处理器,我们将设置count = {3,2,2,2}并且位移将是{0,3,5,7}.Gatherv再次完全相同但反过来; count和displs数组将保持不变.
现在,对于2D,这有点棘手.如果我们想发送2d数组的2d子锁,我们现在发送的数据不再是连续的.如果我们向4个处理器发送(比方说)6x6阵列的3x3子块,我们发送的数据中有漏洞:
2D Array
---------
|000|111|
|000|111|
|000|111|
|---+---|
|222|333|
|222|333|
|222|333|
---------
Actual layout in memory
[000111000111000111222333222333222333]
Run Code Online (Sandbox Code Playgroud)
(请注意,所有高性能计算都归结为了解内存中数据的布局.)
如果我们想要将标记为"1"的数据发送到任务1,我们需要跳过三个值,发送三个值,跳过三个值,发送三个值,跳过三个值,发送三个值.第二个复杂因素是次区域停止和开始; 请注意,区域"1"不会在区域"0"停止的地方开始; 在区域"0"的最后一个元素之后,存储器中的下一个位置是通过区域"1"的中途.
让我们首先解决第一个布局问题 - 如何只提取我们想要发送的数据.我们总是可以将所有"0"区域数据复制到另一个连续的数组,然后发送; 如果我们足够仔细地计划出来,我们甚至可以这样做,以便我们可以调用MPI_Scatter结果.但我们宁愿不必以这种方式转换整个主数据结构.
到目前为止,我们使用的所有MPI数据类型都是简单的 - MPI_INT指定(比方说)连续4个字节.但是,MPI允许您创建自己的数据类型,以描述内存中任意复杂的数据布局.而这种情况 - 数组的矩形子区域 - 很常见,因此有一个特定的调用.对于我们上面描述的二维情况,
MPI_Datatype newtype;
int sizes[2] = {6,6}; /* size of global array */
int subsizes[2] = {3,3}; /* size of sub-region */
int starts[2] = {0,0}; /* let's say we're looking at region "0",
which begins at index [0,0] */
MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
MPI_Type_commit(&newtype);
Run Code Online (Sandbox Code Playgroud)
这创建了一个从全局数组中仅选取区域"0"的类型; 我们现在可以将这段数据发送到另一个处理器
MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "0" */
Run Code Online (Sandbox Code Playgroud)
并且接收进程可以将其接收到本地阵列中.请注意,如果接收过程仅将其接收到3x3阵列中,则无法描述接收过程作为一种类型的接收过程newtype; 不再描述内存布局.相反,它只是接收一个3*3 = 9个整数的块:
MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);
Run Code Online (Sandbox Code Playgroud)
请注意,我们也可以为其他子区域执行此操作,方法是start为其他块创建不同的类型(具有不同的数组),或者仅通过在特定块的起始点发送:
MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "1" */
MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "2" */
MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "3" */
Run Code Online (Sandbox Code Playgroud)
最后,请注意,我们要求全局和本地在这里是连续的内存块; 即,&(global[0][0])和&(local[0][0])(或,等价地,*global和*local指向的存储器中连续6*6和3×3块;未被分配动态多d阵列的通常的方式保证它示出了如何下面做到这一点.
现在我们已经了解了如何指定子区域,在使用分散/聚集操作之前还有一件事要讨论,这就是这些类型的"大小".我们不能仅使用MPI_Scatter()(甚至是scatterv)这些类型,因为这些类型具有16个整数的范围; 也就是说,它们结束后它们的结尾是16个整数 - 它们结束的位置与下一个块的开始位置不一致,所以我们不能只使用分散 - 它会选择错误的位置来开始发送数据到下一个处理器.
当然,我们可以MPI_Scatterv()自己使用和指定位移,这就是我们要做的 - 除了位移以发送类型大小为单位,这对我们也没有帮助; 这些块从全局数组开始处的(0,3,18,21)个整数的偏移量开始,并且一个块从它开始的位置结束16个整数的事实不允许我们以整数倍数表示这些位移.
为了解决这个问题,MPI允许您为这些计算设置类型的范围.它不会截断类型; 它只是用于确定下一个元素在最后一个元素的起始位置.对于类似这些类型的类型,将范围设置为小于内存中距离类型实际末端的距离通常很方便.
我们可以将范围设定为对我们来说方便的任何事物.我们可以将范围设为1整数,然后以整数为单位设置位移.但是,在这种情况下,我喜欢将范围设置为3个整数 - 子行的大小 - 这样,块"1"在块"0"之后立即开始,块"3"在块之后立即开始" 2" .不幸的是,从块"2"跳到块"3"时,它的效果并不好,但这无济于事.
因此,为了在这种情况下分散子块,我们将执行以下操作:
MPI_Datatype type, resizedtype;
int sizes[2] = {6,6}; /* size of global array */
int subsizes[2] = {3,3}; /* size of sub-region */
int starts[2] = {0,0}; /* let's say we're looking at region "0",
which begins at index [0,0] */
/* as before */
MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);
/* change the extent of the type */
MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
MPI_Type_commit(&resizedtype);
Run Code Online (Sandbox Code Playgroud)
在这里,我们创建了与以前相同的块类型,但我们已经调整了它的大小; 我们没有改变"开始"类型的位置(0),但我们已经改变了它"结束"的地方(3个整数).我们之前没有提到这个,但是MPI_Type_commit要求能够使用这种类型; 但是你只需要提交实际使用的最终类型,而不是任何中间步骤.完成后MPI_Type_free用来释放类型.
所以现在,最后,我们可以分散块:上面的数据操作有点复杂,但是一旦完成,scatterv看起来就像之前一样:
int counts[4] = {1,1,1,1}; /* how many pieces of data everyone has, in units of blocks */
int displs[4] = {0,1,6,7}; /* the starting point of everyone's data */
/* in the global array, in block extents */
MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
resizedtype,
local, 3*3, MPI_INT; /* I'm receiving 3*3 MPI_INTs into local */
root, MPI_COMM_WORLD);
Run Code Online (Sandbox Code Playgroud)
现在我们已经完成了一段时间的分散,聚集和MPI衍生类型.
下面是一个示例代码,它显示了带有字符数组的聚集和分散操作.运行程序:
$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
Run Code Online (Sandbox Code Playgroud)
并且代码如下.
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"
int malloc2dchar(char ***array, int n, int m) {
/* allocate the n*m contiguous items */
char *p = (char *)malloc(n*m*sizeof(char));
if (!p) return -1;
/* allocate the row pointers into the memory */
(*array) = (char **)malloc(n*sizeof(char*));
if (!(*array)) {
free(p);
return -1;
}
/* set up the pointers into the contiguous memory */
for (int i=0; i<n; i++)
(*array)[i] = &(p[i*m]);
return 0;
}
int free2dchar(char ***array) {
/* free the memory - the first element of the array is at the start */
free(&((*array)[0][0]));
/* free the pointers into the memory */
free(*array);
return 0;
}
int main(int argc, char **argv) {
char **global, **local;
const int gridsize=10; // size of grid
const int procgridsize=2; // size of process grid
int rank, size; // rank of current process and no. of processes
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (size != procgridsize*procgridsize) {
fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
MPI_Abort(MPI_COMM_WORLD,1);
}
if (rank == 0) {
/* fill in the array, and print it */
malloc2dchar(&global, gridsize, gridsize);
for (int i=0; i<gridsize; i++) {
for (int j=0; j<gridsize; j++)
global[i][j] = '0'+(3*i+j)%10;
}
printf("Global array is:\n");
for (int i=0; i<gridsize; i++) {
for (int j=0; j<gridsize; j++)
putchar(global[i][j]);
printf("\n");
}
}
/* create the local array which we'll process */
malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);
/* create a datatype to describe the subarrays of the global array */
int sizes[2] = {gridsize, gridsize}; /* global size */
int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize}; /* local size */
int starts[2] = {0,0}; /* where this one starts */
MPI_Datatype type, subarrtype;
MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
MPI_Type_commit(&subarrtype);
char *globalptr=NULL;
if (rank == 0) globalptr = &(global[0][0]);
/* scatter the array to all processors */
int sendcounts[procgridsize*procgridsize];
int displs[procgridsize*procgridsize];
if (rank == 0) {
for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
int disp = 0;
for (int i=0; i<procgridsize; i++) {
for (int j=0; j<procgridsize; j++) {
displs[i*procgridsize+j] = disp;
disp += 1;
}
disp += ((gridsize/procgridsize)-1)*procgridsize;
}
}
MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
0, MPI_COMM_WORLD);
/* now all processors print their local data: */
for (int p=0; p<size; p++) {
if (rank == p) {
printf("Local process on rank %d is:\n", rank);
for (int i=0; i<gridsize/procgridsize; i++) {
putchar('|');
for (int j=0; j<gridsize/procgridsize; j++) {
putchar(local[i][j]);
}
printf("|\n");
}
}
MPI_Barrier(MPI_COMM_WORLD);
}
/* now each processor has its local array, and can process it */
for (int i=0; i<gridsize/procgridsize; i++) {
for (int j=0; j<gridsize/procgridsize; j++) {
local[i][j] = 'A' + rank;
}
}
/* it all goes back to process 0 */
MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
globalptr, sendcounts, displs, subarrtype,
0, MPI_COMM_WORLD);
/* don't need the local data anymore */
free2dchar(&local);
/* or the MPI data type */
MPI_Type_free(&subarrtype);
if (rank == 0) {
printf("Processed grid:\n");
for (int i=0; i<gridsize; i++) {
for (int j=0; j<gridsize; j++) {
putchar(global[i][j]);
}
printf("\n");
}
free2dchar(&global);
}
MPI_Finalize();
return 0;
}
Run Code Online (Sandbox Code Playgroud)