Collective Communication in MPI: One to Many
So far we have seen only examples without communication. However, all practical MPI programs will make use of some form of interprocess communications. The simplest are global, or collective communications in which every process in a communicator group participate. Global communications can be one to many, many to one, or all to all.
In one-to-many collective communications, one process, generally called the root, sends a message to all other members of the communicator group. The buffer is read on the root and written to the recipients.
Broadcast
In a broadcast, the root process sends the same data to every other process. It is not required, but it is usual to make process 0 the root, since that is the only one guaranteed to be present. The buffer may be an array but all elements of the array are sent to every other process in the communicator group.
C++
The prototype is
int MPI_Bcast (void *buffer, int ncount, MPI_Datatype datatype, int root, MPI_Comm communicator);
In this prototype, buffer
is the variable holding the data, ncount
is the number of items (not bytes) sent, MPI_Datatype
is a struct defined in mpi.h
, and MPI_Comm
is also a struct defined in mpi.h
.
C++ Example
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int nprocs,rank;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
char message[]="I have a secret";
MPI_Bcast(message,1,MPI_CHAR,0,MPI_COMM_WORLD);
cout<<rank<<" "<<message<<endl;
MPI_Finalize();
}
Fortran
<type> :: vals
integer :: ncount, root, err
! more code
call MPI_Bcast(vals, ncount, MPI_TYPE, root, MPI_COMM_WORLD, err)
The argument vals
can be of any primitive type that corresponds to a supported MPI_TYPE.
Fortran Example
program broadcaster
use mpi
implicit none
integer :: nprocs, rank, ierr
integer :: sendcount
character(len=15) :: message
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
message="I have a secret"
call MPI_Bcast(message,1,MPI_CHAR,0,MPI_COMM_WORLD,ierr)
print*, rank, message
call MPI_Finalize(ierr)
end program
Python
comm.bcast(data, root=0) #general object
comm.Bcast([sendvals,MPI.TYPE], root=0) #NumPy array
The pickled version does not use MPI.TYPE
because a pickled object is a binary stream and mpi4py handles the data description passed to MPI. In the NumPy version we may specify the type, although it is optional because a Ndarray is aware of its type. The array and type should be a list.
Python Example
import sys
import numpy as np
from mpi4py import MPI
comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()
message=np.array(["I have a secret"],dtype='str')
comm.Bcast([message,MPI.CHAR])
print(rank,message)
Scatter
A scatter
breaks up an array and sends one part to each process, with root keeping its own part. The simplest function distributes the same ncount
of the array to each process. The sections are distributed from the first element in rank order. If root=0 this means that it sends itself the first ncount elements and sends the next ncount elements to process 1, the ncount after that to process 2, and so forth. If root is not zero, that process sends the first ncount to rank 0 and so on, sends the rank-appropriate section to itself, then sends to the next until all processes in the communicator group have received data. For a simple Scatter, the number of elements of the “send buffer” should be divisible by ncount.
In the root process, the send buffer must contain all the data to be distributed, so it is larger than receive buffer by a factor of $ncount \times nprocs$.
C++
int MPI_Scatter(void *sendbuffer, int ncount, MPI_Datatype datatype, void *recvbuffer, int ncount, MPI_Datatype datatype, int root, MPI_Comm communicator);
C++ Example
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int nprocs,rank;
int sendcount, n;
float values[100];
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
//For illustration only, don't hardcode nprocs
if (100%nprocs !=0) {
if (rank==0) {
cout << "For this simple example, nprocs must evenly divide 100\n";
}
exit(1);
}
else {
sendcount=100/nprocs;
}
for(int i=0;i<100;++i) {
values[i]=i+1;
}
cout<<setprecision(1)<<fixed;
float *myvals=new float[sendcount];
MPI_Scatter(values,sendcount,MPI_FLOAT,myvals,sendcount,MPI_FLOAT,0,MPI_COMM_WORLD);
//Forces each process to write separately and in order
//Printing here is to demonstrate how the data are distributed
for (n=0;n<nprocs;++n) {
MPI_Barrier(MPI_COMM_WORLD);
if (n==rank) {
cout<<rank<<":";
for (int i=0;i<sendcount;++i) {
cout<<" "<<myvals[i]<<" ";
}
cout<<"\n";
}
}
MPI_Finalize();
}
Fortran
call MPI_Scatter(vals,ncount,MPI_TYPE,rvals,ncount,MPI_TYPE,root,MPI_COMM_WORLD,err)
Fortran Example
program scatterthem
use mpi
implicit none
real, dimension(100) :: values
real, dimension(:), allocatable :: myvals
integer :: rank, nprocs, ierr
integer :: sendcount, i, n
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
!For illustration only, don't hardcode nprocs
if (mod(100,nprocs)/=0) then
if (rank==0) then
print*, "For this simple example, nprocs must evenly divide 100"
endif
call MPI_Finalize(ierr)
stop
endif
! Only root has to know the global values
if (rank==0) then
do i=1,100
values(i)=i
enddo
endif
sendcount=100/nprocs
allocate(myvals(sendcount))
call MPI_Scatter(values,sendcount,MPI_REAL,myvals,sendcount,MPI_REAL,0,MPI_COMM_WORLD,ierr)
!Forces each process to write separately and in order
!Printing here is to demonstrate how the data are distributed
do n=0,nprocs-1
call MPI_Barrier(MPI_COMM_WORLD,ierr)
if (n==rank) then
write(6,'(i4,a)',advance='no') rank,":"
do i=1,size(myvals)-1
write(6,'(f5.0)',advance='no') myvals(i)
enddo
write(6,'(f5.0)') myvals(size(myvals))
endif
enddo
call MPI_Finalize(ierr)
end program
Python
Both buffers should be Numpy arrays. The datatype is usually not required, and the root process is 0 by default, so that is an optional argument.
comm.Scatter([sendvals,MPI.DOUBLE],[recvals,MPI.DOUBLE,root=0)
Python Example
import sys
import numpy as np
from mpi4py import MPI
comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()
if 100%nprocs!=0:
if rank==0:
print("For this simple example, nprocs must evenly divide 100")
sys.exit()
else:
sendcount=100//nprocs
values=np.linspace(1.,100.,100)
myvals=np.empty(sendcount)
comm.Scatter([values,sendcount,MPI.DOUBLE],myvals)
print(rank,myvals)
More General Scattering
The MPI_Scatter
procedure allows only equal distribution of an entire array of values. If more general distribution is needed, the MPI_Scatterv
(vector scatter) allows considerable flexibility. In this case, ncount
becomes an integer array. Different ranks may receive different numbers of items. An integer displacement vector is included, so that elements can be skipped. Displacements are measured from the start of the array; each one is essentially the starting index of the block to be sent. (Fortran programmers, pay attention to 1-based versus 0-based computations.)
In the example codes we will assume 8 processes; generally we should not hard-code a number of processes, but this simplifies the arithmetic.
C/C++
int MPI_Scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcounts, MPI_Datatype recvtype, int root, MPI_Comm comm);
C++ Example
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int nprocs,rank;
int n;
float values[101];
int displs[8];
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
//For illustration only, don't hardcode nprocs
if (nprocs != 8) {
cout << "Example requires 8 processes\n";
exit(1);
}
for (int i=0;i<=100;++i) {
values[i]=(float) i;
}
int nprow=101/nprocs;
int leftover=101%nprocs;
cout<<setprecision(1)<<fixed;
if (rank == 0) {
n=0;
for (int i=0;i<nprocs;++i) {
for (int j=n;j<=n+nprow-1;++j) {
cout<<" "<<values[j]<<" ";
}
n=n+nprow;
cout<<"\n";
}
for (int i=n;i<=n+leftover-1;++i) {
cout<<values[i]<<" ";
}
cout<<"\n";
}
//Hand-distributing the numbers
int sendcounts[]={12,12,11,12,13,9,10,8};
int offsets[]={0,2,3,1,4,1,1,2};
displs[0]=offsets[0];
for (int i=1;i<nprocs;++i) {
displs[i]=displs[i-1]+sendcounts[i-1]+offsets[i];
}
float *myvals=new float[sendcounts[rank]];
MPI_Scatterv(values,sendcounts,displs,MPI_FLOAT,myvals,sendcounts[rank],MPI_FLOAT,0,MPI_COMM_WORLD);
//Forces each process to write separately and in order
//Printing here is to demonstrate how the data are distributed
for (n=0;n<nprocs;++n) {
MPI_Barrier(MPI_COMM_WORLD);
if (n==rank) {
cout<<rank;
for (int i=0;i<=sendcounts[rank]-1;++i) {
cout<<" "<<myvals[i]<<" ";
}
cout<<"\n";
}
}
MPI_Finalize();
}
Fortran
call MPI_SCATTERV(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcounts, recvtype, root, comm, ierr)
Fortran Example
program scatterthem
use mpi
implicit none
real, dimension(101) :: values
real, dimension(:), allocatable :: myvals
integer, dimension(8):: sendcounts,offsets,displs
integer :: i,n,nprow
integer :: rank, nprocs, ierr
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
!For illustration only, don't hardcode nprocs
if (nprocs /= 8) then
stop "Example requires 8 processes"
endif
do i=1,101
values(i)=i-1
enddo
nprow=101/nprocs
if (rank == 0) then
n=1
do i=1,nprocs
write(6,'(20f5.0)') values(n:n+nprow-1)
n=n+nprow
enddo
write(6,'(20f5.0)') values(n:)
endif
!Hand-distributing the numbers
sendcounts=[12,12,11,12,13,9,10,8]
offsets=[0,2,3,1,4,1,1,2]
displs(1)=offsets(1)
do i=2,nprocs
displs(i)=displs(i-1)+sendcounts(i-1)+offsets(i)
enddo
allocate(myvals(sendcounts(rank+1)))
call MPI_Scatterv(values,sendcounts,displs,MPI_REAL,myvals,sendcounts(rank+1),MPI_REAL,0,MPI_COMM_WORLD,ierr)
!Forces each process to write separately and in order
!Printing here is to demonstrate how the data are distributed
do n=0,nprocs-1
call MPI_Barrier(MPI_COMM_WORLD,ierr)
if (n==rank) then
write(6,'(i4)',advance='no') rank
do i=1,size(myvals)-1
write(6,'(f5.0)',advance='no') myvals(i)
enddo
write(6,'(f5.0)') myvals(size(myvals))
endif
enddo
call MPI_Finalize(ierr)
end program
Python
comm.Scatterv([sendbuf,sendcounts,displs,MPI.TYPE],recvbuf)
Unlike many other mpi4py procedures, the MPI.TYPE may often be required for correct data transmission.
Python Example
import sys
import numpy as np
from mpi4py import MPI
comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()
if 100%nprocs!=0:
if rank==0:
print("For this simple example, nprocs must evenly divide 100")
sys.exit()
else:
sendcount=100//nprocs
values=np.linspace(1.,100.,100)
myvals=np.empty(sendcount)
comm.Scatter([values,sendcount,MPI.DOUBLE],myvals)
print(rank,myvals)