使用MPI在C中发送2D阵列块

Sun*_*vil 48 c parallel-processing mpi

如何将二维数组块发送到不同的处理器?假设2D阵列大小为400x400,我想将大小为100X100的块发送到不同的处理器.这个想法是每个处理器将在其单独的块上执行计算,并将其结果发送回第一个处理器以获得最终结果.
我在C程序中使用MPI.

Jon*_*rsi 137

首先让我说你通常不想这样做 - 分散并从一些"主"过程中收集大量数据.通常,您希望每个任务都能完成自己的工作,并且您的目标是永远不要让一个处理器需要整个数据的"全局视图"; 只要您需要,就可以限制可扩展性和问题大小.如果您正在为I/O执行此操作 - 一个进程读取数据,然后将其分散,然后将其收集回来进行写入,您最终将需要查看MPI-IO.

但是,为了解决您的问题,MPI有很好的方法可以将任意数据从内存中拉出来,并将其分散/收集到一组处理器中.不幸的是,这需要相当数量的MPI概念--MPI类型,范围和集体操作.在这个问题的答案中讨论了很多基本思想 - MPI_Type_create_subarray和MPI_Gather.

更新 - 在白天的冷光下,这是很多代码而不是很多解释.所以让我稍微扩展一下.

考虑一个1d整数全局数组,任务0有你想要分配给许多MPI任务,这样它们每个都得到一个本地数组.假设你有4个任务,全局数组是[01234567].您可以让任务0发送四条消息(包括一条消息)来分发它,当它需要重新组装时,接收四条消息将它们捆绑在一起; 但这显然在大量流程中非常耗时.有针对这些操作的优化例程 - 分散/收集操作.所以在这个1d案例中你会做这样的事情:

int global[8];   /* only task 0 has this */
int local[2];    /* everyone has this */
const int root = 0;   /* the processor with the initial global data */

if (rank == root) {
   for (int i=0; i<7; i++) global[i] = i;
}

MPI_Scatter(global, 2, MPI_INT,      /* send everyone 2 ints from global */
            local,  2, MPI_INT,      /* each proc receives 2 ints into local */
            root, MPI_COMM_WORLD);   /* sending process is root, all procs in */
                                     /* MPI_COMM_WORLD participate */
Run Code Online (Sandbox Code Playgroud)

在此之后,处理器的数据看起来像

task 0:  local:[01]  global: [01234567]
task 1:  local:[23]  global: [garbage-]
task 2:  local:[45]  global: [garbage-]
task 3:  local:[67]  global: [garbage-]
Run Code Online (Sandbox Code Playgroud)

也就是说,分散操作采用全局数组并将连续的2-int块发送到所有处理器.

要重新组装数组,我们使用的MPI_Gather()操作完全相同但反过来:

for (int i=0; i<2; i++) 
   local[i] = local[i] + rank;

MPI_Gather(local,  2, MPI_INT,      /* everyone sends 2 ints from local */
           global, 2, MPI_INT,      /* root receives 2 ints each proc into global */
           root, MPI_COMM_WORLD);   /* recv'ing process is root, all procs in */
                                    /* MPI_COMM_WORLD participate */
Run Code Online (Sandbox Code Playgroud)

现在数据看起来像

task 0:  local:[01]  global: [0134679a]
task 1:  local:[34]  global: [garbage-]
task 2:  local:[67]  global: [garbage-]
task 3:  local:[9a]  global: [garbage-]
Run Code Online (Sandbox Code Playgroud)

Gather带回所有数据,这里是10,因为在开始这个例子时我没有仔细考虑我的格式.

如果数据点的数量不能均匀地划分进程数,会发生什么?我们需要向每个进程发送不同数量的项目?然后,您需要一个通用版本的scatter,MPI_Scatterv()它允许您指定每个处理器的计数和位移 - 位于全局数组中的那条数据的起始位置.因此,假设您有一个[abcdefghi]包含9个字符的字符数组,并且您将为每个进程分配两个字符,除了最后一个字符,其中有三个字符.然后你需要

char global[9];   /* only task 0 has this */
char local[3]={'-','-','-'};    /* everyone has this */
int  mynum;                     /* how many items */
const int root = 0;   /* the processor with the initial global data */

if (rank == 0) {
   for (int i=0; i<8; i++) global[i] = 'a'+i;
}

int counts[4] = {2,2,2,3};   /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = {0,2,4,6};   /* the starting point of everyone's data */
                             /* in the global array */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
            MPI_INT,      
            local, mynum, MPI_INT;   /* I'm receiving mynum MPI_INTs into local */
            root, MPI_COMM_WORLD);
Run Code Online (Sandbox Code Playgroud)

现在数据看起来像

task 0:  local:[ab-]  global: [abcdefghi]
task 1:  local:[cd-]  global: [garbage--]
task 2:  local:[ef-]  global: [garbage--]
task 3:  local:[ghi]  global: [garbage--]
Run Code Online (Sandbox Code Playgroud)

您现在使用scatterv来分发不规则数据量.在每种情况下,位移是两个*级别(以字符为单位;位移是以散布形式发送的类型为单位的单位;或者是从数组的起点开始,它通常不以字节为单位),并且计数是{2,2,2,3}.如果它是我们希望有3个字符的第一个处理器,我们将设置count = {3,2,2,2}并且位移将是{0,3,5,7}.Gatherv再次完全相同但反过来; count和displs数组将保持不变.

现在,对于2D,这有点棘手.如果我们想发送2d数组的2d子锁,我们现在发送的数据不再是连续的.如果我们向4个处理器发送(比方说)6x6阵列的3x3子块,我们发送的数据中有漏洞:

2D Array

   ---------
   |000|111|
   |000|111|
   |000|111|
   |---+---|
   |222|333|
   |222|333|
   |222|333|
   ---------

Actual layout in memory

   [000111000111000111222333222333222333]
Run Code Online (Sandbox Code Playgroud)

(请注意,所有高性能计算都归结为了解内存中数据的布局.)

如果我们想要将标记为"1"的数据发送到任务1,我们需要跳过三个值,发送三个值,跳过三个值,发送三个值,跳过三个值,发送三个值.第二个复杂因素是次区域停止和开始; 请注意,区域"1"不会在区域"0"停止的地方开始; 在区域"0"的最后一个元素之后,存储器中的下一个位置是通过区域"1"的中途.

让我们首先解决第一个布局问题 - 如何只提取我们想要发送的数据.我们总是可以将所有"0"区域数据复制到另一个连续的数组,然后发送; 如果我们足够仔细地计划出来,我们甚至可以这样做,以便我们可以调用MPI_Scatter结果.但我们宁愿不必以这种方式转换整个主数据结构.

到目前为止,我们使用的所有MPI数据类型都是简单的 - MPI_INT指定(比方说)连续4个字节.但是,MPI允许您创建自己的数据类型,以描述内存中任意复杂的数据布局.而这种情况 - 数组的矩形子区域 - 很常见,因此有一个特定的调用.对于我们上面描述的二维情况,

    MPI_Datatype newtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
    MPI_Type_commit(&newtype);
Run Code Online (Sandbox Code Playgroud)

这创建了一个从全局数组中仅选取区域"0"的类型; 我们现在可以将这段数据发送到另一个处理器

    MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "0" */
Run Code Online (Sandbox Code Playgroud)

并且接收进程可以将其接收到本地阵列中.请注意,如果接收过程仅将其接收到3x3阵列中,则无法描述接收过程作为一种类型的接收过程newtype; 不再描述内存布局.相反,它只是接收一个3*3 = 9个整数的块:

    MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);
Run Code Online (Sandbox Code Playgroud)

请注意,我们也可以为其他子区域执行此操作,方法是start为其他块创建不同的类型(具有不同的数组),或者仅通过在特定块的起始点发送:

    MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "1" */
    MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "2" */
    MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "3" */
Run Code Online (Sandbox Code Playgroud)

最后,请注意,我们要求全局和本地在这里是连续的内存块; 即,&(global[0][0])&(local[0][0])(或,等价地,*global*local指向的存储器中连续6*6和3×3块;未被分配动态多d阵列的通常的方式保证它示出了如何下面做到这一点.

现在我们已经了解了如何指定子区域,在使用分散/聚集操作之前还有一件事要讨论,这就是这些类型的"大小".我们不能仅使用MPI_Scatter()(甚至是scatterv)这些类型,因为这些类型具有16个整数的范围; 也就是说,它们结束后它们的结尾是16个整数 - 它们结束的位置与下一个块的开始位置不一致,所以我们不能只使用分散 - 它会选择错误的位置来开始发送数据到下一个处理器.

当然,我们可以MPI_Scatterv()自己使用和指定位移,这就是我们要做的 - 除了位移以发送类型大小为单位,这对我们也没有帮助; 这些块从全局数组开始处的(0,3,18,21)个整数的偏移量开始,并且一个块从它开始的位置结束16个整数的事实不允许我们以整数倍数表示这些位移.

为了解决这个问题,MPI允许您为这些计算设置类型的范围.它不会截断类型; 它只是用于确定下一个元素在最后一个元素的起始位置.对于类似这些类型的类型,将范围设置为小于内存中距离类型实际末端的距离通常很方便.

我们可以将范围设定为对我们来说方便的任何事物.我们可以将范围设为1整数,然后以整数为单位设置位移.但是,在这种情况下,我喜欢将范围设置为3个整数 - 子行的大小 - 这样,块"1"在块"0"之后立即开始,块"3"在块之后立即开始" 2" .不幸的是,从块"2"跳到块"3"时,它的效果并不好,但这无济于事.

因此,为了在这种情况下分散子块,我们将执行以下操作:

    MPI_Datatype type, resizedtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    /* as before */
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);  
    /* change the extent of the type */
    MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
    MPI_Type_commit(&resizedtype);
Run Code Online (Sandbox Code Playgroud)

在这里,我们创建了与以前相同的块类型,但我们已经调整了它的大小; 我们没有改变"开始"类型的位置(0),但我们已经改变了它"结束"的地方(3个整数).我们之前没有提到这个,但是MPI_Type_commit要求能够使用这种类型; 但是你只需要提交实际使用的最终类型,而不是任何中间步骤.完成后MPI_Type_free用来释放类型.

所以现在,最后,我们可以分散块:上面的数据操作有点复杂,但是一旦完成,scatterv看起来就像之前一样:

int counts[4] = {1,1,1,1};   /* how many pieces of data everyone has, in units of blocks */
int displs[4] = {0,1,6,7};   /* the starting point of everyone's data */
                             /* in the global array, in block extents */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
            resizedtype,      
            local, 3*3, MPI_INT;   /* I'm receiving 3*3 MPI_INTs into local */
            root, MPI_COMM_WORLD);
Run Code Online (Sandbox Code Playgroud)

现在我们已经完成了一段时间的分散,聚集和MPI衍生类型.

下面是一个示例代码,它显示了带有字符数组的聚集和分散操作.运行程序:

$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
Run Code Online (Sandbox Code Playgroud)

并且代码如下.

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) {

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2dchar(char ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        }


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        }
    }

    /* create the local array which we'll process */
    malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
        int disp = 0;
        for (int i=0; i<procgridsize; i++) {
            for (int j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (int j=0; j<gridsize/procgridsize; j++) {
                    putchar(local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (int i=0; i<gridsize/procgridsize; i++) {
        for (int j=0; j<gridsize/procgridsize; j++) {
            local[i][j] = 'A' + rank;
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++) {
                putchar(global[i][j]);
            }
            printf("\n");
        }

        free2dchar(&global);
    }


    MPI_Finalize();

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

  • 这在某些版本中一次又一次出现; 我希望写一个答案,我们可以一直指着人们.但是谢谢:) (7认同)