Collective Communication in MPI: One to Many

So far we have seen only examples without communication. However, all practical MPI programs will make use of some form of interprocess communications. The simplest are global, or collective communications in which every process in a communicator group participate. Global communications can be one to many, many to one, or all to all.

In one-to-many collective communications, one process, generally called the root, sends a message to all other members of the communicator group. The buffer is read on the root and written to the recipients.

Broadcast

In a broadcast, the root process sends the same data to every other process. It is not required, but it is usual to make process 0 the root, since that is the only one guaranteed to be present. The buffer may be an array but all elements of the array are sent to every other process in the communicator group.

Broadcast

C++

The prototype is

int MPI_Bcast (void *buffer, int ncount, MPI_Datatype datatype, int root, MPI_Comm communicator);

In this prototype, buffer is the variable holding the data, ncount is the number of items (not bytes) sent, MPI_Datatype is a struct defined in mpi.h, and MPI_Comm is also a struct defined in mpi.h.

C++ Example

#include <iostream>
#include <iomanip>
#include "mpi.h"

using namespace std;

int main(int argc, char *argv[]) {

    int nprocs,rank;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    char message[]="I have a secret";

    MPI_Bcast(message,1,MPI_CHAR,0,MPI_COMM_WORLD);

    cout<<rank<<" "<<message<<endl;

    MPI_Finalize();

}
        


Fortran

<type>   :: vals
integer  :: ncount, root, err
! more code
call MPI_Bcast(vals, ncount, MPI_TYPE, root, MPI_COMM_WORLD, err)

The argument vals can be of any primitive type that corresponds to a supported MPI_TYPE.

Fortran Example

program broadcaster
    use mpi
    implicit none

    integer           :: nprocs, rank, ierr
    integer           :: sendcount
    character(len=15) :: message

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)

    message="I have a secret"

    call MPI_Bcast(message,1,MPI_CHAR,0,MPI_COMM_WORLD,ierr)

    print*, rank, message

    call MPI_Finalize(ierr)

end program 

Python

comm.bcast(data, root=0)  #general object
comm.Bcast([sendvals,MPI.TYPE], root=0)  #NumPy array

The pickled version does not use MPI.TYPE because a pickled object is a binary stream and mpi4py handles the data description passed to MPI. In the NumPy version we may specify the type, although it is optional because a Ndarray is aware of its type. The array and type should be a list.

Python Example

import sys
import numpy as np
from mpi4py import MPI

comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()

message=np.array(["I have a secret"],dtype='str')

comm.Bcast([message,MPI.CHAR])

print(rank,message)

Scatter

A scatter breaks up an array and sends one part to each process, with root keeping its own part. The simplest function distributes the same ncount of the array to each process. The sections are distributed from the first element in rank order. If root=0 this means that it sends itself the first ncount elements and sends the next ncount elements to process 1, the ncount after that to process 2, and so forth. If root is not zero, that process sends the first ncount to rank 0 and so on, sends the rank-appropriate section to itself, then sends to the next until all processes in the communicator group have received data. For a simple Scatter, the number of elements of the “send buffer” should be divisible by ncount.

Scatter

In the root process, the send buffer must contain all the data to be distributed, so it is larger than receive buffer by a factor of $ncount \times nprocs$.

C++

int MPI_Scatter(void *sendbuffer, int ncount, MPI_Datatype datatype, void *recvbuffer, int ncount, MPI_Datatype datatype, int root, MPI_Comm communicator);
C++ Example

#include <iostream>
#include <iomanip>
#include "mpi.h"

using namespace std;

int main(int argc, char *argv[]) {

    int nprocs,rank;
    int sendcount, n;
    float values[100];

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    //For illustration only, don't hardcode nprocs
    if (100%nprocs !=0) {
	if (rank==0) {
            cout << "For this simple example, nprocs must evenly divide 100\n";
	}
	exit(1);
    }
    else {
        sendcount=100/nprocs;
    }

    for(int i=0;i<100;++i) {
        values[i]=i+1;
    }

    cout<<setprecision(1)<<fixed;

    float *myvals=new float[sendcount];

    MPI_Scatter(values,sendcount,MPI_FLOAT,myvals,sendcount,MPI_FLOAT,0,MPI_COMM_WORLD);

    //Forces each process to write separately and in order
    //Printing here is to demonstrate how the data are distributed
    for (n=0;n<nprocs;++n) {
        MPI_Barrier(MPI_COMM_WORLD);
        if (n==rank) {
	    cout<<rank<<":";
	    for (int i=0;i<sendcount;++i) {
		cout<<" "<<myvals[i]<<" ";
	    }
	    cout<<"\n";
	}
    }

    MPI_Finalize();

}
        


Fortran

call MPI_Scatter(vals,ncount,MPI_TYPE,rvals,ncount,MPI_TYPE,root,MPI_COMM_WORLD,err)
Fortran Example

program scatterthem
    use mpi
    implicit none

    real, dimension(100) :: values
    real, dimension(:), allocatable :: myvals
    integer              :: rank, nprocs, ierr
    integer              :: sendcount, i, n

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)

    !For illustration only, don't hardcode nprocs
    if (mod(100,nprocs)/=0) then
        if (rank==0) then
            print*, "For this simple example, nprocs must evenly divide 100"
        endif
        call MPI_Finalize(ierr)
        stop 
    endif

    ! Only root has to know the global values
    if (rank==0) then
        do i=1,100
            values(i)=i
        enddo
    endif

    sendcount=100/nprocs
    allocate(myvals(sendcount))

    call MPI_Scatter(values,sendcount,MPI_REAL,myvals,sendcount,MPI_REAL,0,MPI_COMM_WORLD,ierr)

    !Forces each process to write separately and in order
    !Printing here is to demonstrate how the data are distributed
    do n=0,nprocs-1
        call MPI_Barrier(MPI_COMM_WORLD,ierr)
        if (n==rank) then
            write(6,'(i4,a)',advance='no') rank,":"
            do i=1,size(myvals)-1
                write(6,'(f5.0)',advance='no') myvals(i)
            enddo
            write(6,'(f5.0)') myvals(size(myvals))
        endif
    enddo

    call MPI_Finalize(ierr)

end program

Python

Both buffers should be Numpy arrays. The datatype is usually not required, and the root process is 0 by default, so that is an optional argument.

comm.Scatter([sendvals,MPI.DOUBLE],[recvals,MPI.DOUBLE,root=0)
Python Example

import sys
import numpy as np
from mpi4py import MPI

comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()

if 100%nprocs!=0:
    if rank==0:
        print("For this simple example, nprocs must evenly divide 100")
    sys.exit()
else:
    sendcount=100//nprocs

values=np.linspace(1.,100.,100)
myvals=np.empty(sendcount)

comm.Scatter([values,sendcount,MPI.DOUBLE],myvals)

print(rank,myvals)

More General Scattering

The MPI_Scatter procedure allows only equal distribution of an entire array of values. If more general distribution is needed, the MPI_Scatterv (vector scatter) allows considerable flexibility. In this case, ncount becomes an integer array. Different ranks may receive different numbers of items. An integer displacement vector is included, so that elements can be skipped. Displacements are measured from the start of the array; each one is essentially the starting index of the block to be sent. (Fortran programmers, pay attention to 1-based versus 0-based computations.)

In the example codes we will assume 8 processes; generally we should not hard-code a number of processes, but this simplifies the arithmetic.

C/C++

int MPI_Scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcounts, MPI_Datatype recvtype, int root, MPI_Comm comm);
C++ Example

#include <iostream>
#include <iomanip>
#include "mpi.h"

using namespace std;

int main(int argc, char *argv[]) {

    int nprocs,rank;
    int n;
    float values[101];
    int displs[8];

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    //For illustration only, don't hardcode nprocs
    if (nprocs != 8) {
        cout << "Example requires 8 processes\n";
	exit(1);
    }

    for (int i=0;i<=100;++i) {
        values[i]=(float) i;
    }

    int nprow=101/nprocs;
    int leftover=101%nprocs;
    cout<<setprecision(1)<<fixed;
    if (rank == 0) {
        n=0;
        for (int i=0;i<nprocs;++i) {
	    for (int j=n;j<=n+nprow-1;++j) {
	        cout<<" "<<values[j]<<" ";
            }
           n=n+nprow;
	   cout<<"\n";
	}
	for (int i=n;i<=n+leftover-1;++i) {
	    cout<<values[i]<<" ";
        }
	cout<<"\n";
    }

    //Hand-distributing the numbers
    int sendcounts[]={12,12,11,12,13,9,10,8};
    int offsets[]={0,2,3,1,4,1,1,2};

    displs[0]=offsets[0];
    for (int i=1;i<nprocs;++i) {
        displs[i]=displs[i-1]+sendcounts[i-1]+offsets[i];
    }

    float *myvals=new float[sendcounts[rank]];

    MPI_Scatterv(values,sendcounts,displs,MPI_FLOAT,myvals,sendcounts[rank],MPI_FLOAT,0,MPI_COMM_WORLD);

    //Forces each process to write separately and in order
    //Printing here is to demonstrate how the data are distributed
    for (n=0;n<nprocs;++n) {
        MPI_Barrier(MPI_COMM_WORLD);
        if (n==rank) {
	    cout<<rank;
	    for (int i=0;i<=sendcounts[rank]-1;++i) {
		cout<<" "<<myvals[i]<<" ";
	    }
	    cout<<"\n";
	}
    }

    MPI_Finalize();

}
        


Fortran

call MPI_SCATTERV(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcounts, recvtype, root, comm, ierr)
Fortran Example

program scatterthem
    use mpi
    implicit none

    real, dimension(101) :: values
    real, dimension(:), allocatable :: myvals
    integer, dimension(8):: sendcounts,offsets,displs
    integer              :: i,n,nprow
    integer              :: rank, nprocs, ierr

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)

    !For illustration only, don't hardcode nprocs
    if (nprocs /= 8) then
        stop "Example requires 8 processes"
    endif

    do i=1,101
        values(i)=i-1
    enddo

    nprow=101/nprocs
    if (rank == 0) then
        n=1
        do i=1,nprocs
           write(6,'(20f5.0)') values(n:n+nprow-1)
           n=n+nprow
        enddo
        write(6,'(20f5.0)') values(n:)
    endif

    !Hand-distributing the numbers
    sendcounts=[12,12,11,12,13,9,10,8]
    offsets=[0,2,3,1,4,1,1,2]

    displs(1)=offsets(1)
    do i=2,nprocs
        displs(i)=displs(i-1)+sendcounts(i-1)+offsets(i)
    enddo

    allocate(myvals(sendcounts(rank+1)))

    call MPI_Scatterv(values,sendcounts,displs,MPI_REAL,myvals,sendcounts(rank+1),MPI_REAL,0,MPI_COMM_WORLD,ierr)

    !Forces each process to write separately and in order
    !Printing here is to demonstrate how the data are distributed
    do n=0,nprocs-1
        call MPI_Barrier(MPI_COMM_WORLD,ierr)
        if (n==rank) then
            write(6,'(i4)',advance='no') rank
            do i=1,size(myvals)-1
                write(6,'(f5.0)',advance='no') myvals(i)
            enddo
            write(6,'(f5.0)') myvals(size(myvals))
        endif
    enddo

    call MPI_Finalize(ierr)

end program
        


Python

comm.Scatterv([sendbuf,sendcounts,displs,MPI.TYPE],recvbuf)

Unlike many other mpi4py procedures, the MPI.TYPE may often be required for correct data transmission.

Python Example

import sys
import numpy as np
from mpi4py import MPI

comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()

if 100%nprocs!=0:
    if rank==0:
        print("For this simple example, nprocs must evenly divide 100")
    sys.exit()
else:
    sendcount=100//nprocs

values=np.linspace(1.,100.,100)
myvals=np.empty(sendcount)

comm.Scatter([values,sendcount,MPI.DOUBLE],myvals)

print(rank,myvals)

Previous
Next