Global Communication in MPI: Many To Many
In many-to-many collective communications, all processes in the communicator group send a message to others.
Barrier
When MPI_Barrier
is invoked, each process pauses until all processes in the communicator group have called this function. The MPI_BARRIER
is used to synchronize processes. It should be used sparingly, since it “serializes” a parallel program. Most of the collective communication routines contain an implicit barrier so an explicit MPI_Barrier
is not required.
C++
MPI_Barrier(MPI_Comm comm)
Fortran
call MPI_Barrier(comm, ierr)
Python
comm.Barrier()
It is seldom needed in Python. For examples in C++ and Fortran, please see
scatter.cxx
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int nprocs,rank;
int sendcount, n;
float values[100];
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
//For illustration only, don't hardcode nprocs
if (100%nprocs !=0) {
if (rank==0) {
cout << "For this simple example, nprocs must evenly divide 100\n";
}
exit(1);
}
else {
sendcount=100/nprocs;
}
for(int i=0;i<100;++i) {
values[i]=i+1;
}
cout<<setprecision(1)<<fixed;
float *myvals=new float[sendcount];
MPI_Scatter(values,sendcount,MPI_FLOAT,myvals,sendcount,MPI_FLOAT,0,MPI_COMM_WORLD);
//Forces each process to write separately and in order
//Printing here is to demonstrate how the data are distributed
for (n=0;n<nprocs;++n) {
MPI_Barrier(MPI_COMM_WORLD);
if (n==rank) {
cout<<rank<<":";
for (int i=0;i<sendcount;++i) {
cout<<" "<<myvals[i]<<" ";
}
cout<<"\n";
}
}
MPI_Finalize();
}
scatter.f90
program scatterthem
use mpi
implicit none
real, dimension(100) :: values
real, dimension(:), allocatable :: myvals
integer :: rank, nprocs, ierr
integer :: sendcount, i, n
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
!For illustration only, don't hardcode nprocs
if (mod(100,nprocs)/=0) then
if (rank==0) then
print*, "For this simple example, nprocs must evenly divide 100"
endif
call MPI_Finalize(ierr)
stop
endif
! Only root has to know the global values
if (rank==0) then
do i=1,100
values(i)=i
enddo
endif
sendcount=100/nprocs
allocate(myvals(sendcount))
call MPI_Scatter(values,sendcount,MPI_REAL,myvals,sendcount,MPI_REAL,0,MPI_COMM_WORLD,ierr)
!Forces each process to write separately and in order
!Printing here is to demonstrate how the data are distributed
do n=0,nprocs-1
call MPI_Barrier(MPI_COMM_WORLD,ierr)
if (n==rank) then
write(6,'(i4,a)',advance='no') rank,":"
do i=1,size(myvals)-1
write(6,'(f5.0)',advance='no') myvals(i)
enddo
write(6,'(f5.0)') myvals(size(myvals))
endif
enddo
call MPI_Finalize(ierr)
end program
In these examples, it is used in a loop to force the output from the processes to be separated distinctly and in rank order. Upon entry into the loop, all processes synchronize before executing the loop body. The process whose rank matches the loop variable writes its output, while the other processes skip back to the top of the loop. However, they must wait there until the process doing the writing finishes and invokes MPI_Barrier.
Exercise
Write a program that generates an array of values from 1 to 10 only on the root process. Broadcast the array to each process. Print the array from each process.
Test it with four processes on the frontend or on your workstation.
C++
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int nprocs,rank;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
int *values=new int[10];
for (int i=1; i<=10; ++i){
values[i-1]=i;
}
MPI_Bcast(values,10,MPI_INT,0,MPI_COMM_WORLD);
cout<<rank<<" "<<values[0];
for (int i=1; i<9; ++i) {
cout<<" "<<values[i];
}
cout<<" "<<values[9]<<endl;
MPI_Finalize();
}
Fortran
program broadcaster
use mpi
implicit none
integer :: nprocs, rank, ierr
integer, dimension(:), allocatable :: values
integer :: i
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
allocate(values(10))
if ( rank==0 ) then
values=[(i,i=1,10)]
endif
call MPI_Bcast(values,size(values),MPI_INTEGER,0,MPI_COMM_WORLD,ierr)
write(*,'(11i5)'), rank, values
call MPI_Finalize(ierr)
end program
Python
import sys
import numpy as np
from mpi4py import MPI
comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()
#Must declare space in all ranks. Make sure the dtypes match.
values=np.empty(10,dtype='int')
if rank==0:
values=np.arange(1,11,1,dtype='int')
comm.Bcast([values,MPI.INT])
print(rank,values)
Allreduce
As the examples in the previous chapter demonstrated, when MPI_Reduce is called, only the root process knows the result. This can lead to bugs since it is fairly common that all processes should be aware of the global value. The MPI_Allreduce
procedure performs the reduction and distributes the result. It should always be used rather than a Reduce followed by a Bcast, since the implementation should carry this out more efficiently than with an explicit broadcast.
The syntax for MPI_Allreduce
is identical to that of MPI_Reduce
but with the root number omitted.
int MPI_Allreduce(void *operand, void *result, int ncount, MPI_Datatype type, MPI_Op operator, MPI_Comm comm );
call MPI_ALLREDUCE(sendbuf, recvbuf, ncount, datatype, op, comm, ierr)
comm.Allreduce(sendarr, recvarr, operation)
Exercise Modify the example reduction code in your language of choice to perform an Allreduce.
C++ Solution
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int ncount,nprocs,rank;
int n;
float total;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
ncount=100;
cout<<setprecision(1)<<fixed;
float *myvals=new float[ncount];
// load data into local arrays
myvals[0]=rank+1;
for (int i=1;i<ncount;++i) {
myvals[i]=myvals[i-1]+rank+1.;
}
float mysum=0.0;
for (int i=0;i<ncount;++i) {
mysum+=myvals[i];
}
MPI_Allreduce(&mysum,&total,1,MPI_FLOAT,MPI_SUM,MPI_COMM_WORLD);
for (int n=0;n<nprocs;++n) {
MPI_Barrier(MPI_COMM_WORLD);
if (n==rank) {
cout<<rank<<":"<<mysum<<" "<<total<<endl;
}
}
MPI_Finalize();
}
Fortran Solution
program reduction
use mpi
implicit none
real, dimension(:), allocatable :: myvals
real :: mysum, total
integer :: i,n,nprow
integer :: rank, ncount, nprocs, ierr
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
ncount=100
allocate(myvals(ncount))
! Load data into local array
myvals(1)=1
do i=2,ncount
myvals(i)=myvals(i-1)+1.
enddo
myvals=(rank+1)*myvals
!Local sum
mysum=sum(myvals)
call MPI_Allreduce(mysum,total,1,MPI_REAL,MPI_SUM,MPI_COMM_WORLD,ierr)
print*,rank,mysum,total
call MPI_Finalize(ierr)
end program
Python Solution
import sys
import numpy as np
from mpi4py import MPI
comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()
num=100
myvals=(rank+1)*np.linspace(1.,num,int(num))
#Add the numbers locally
mysum=myvals.sum()
#Get the grand total
total=np.zeros(1)
comm.Allreduce(mysum,total,op=MPI.SUM)
print(rank,mysum,total)
Allgather/Allgatherv
An allgather is the same as a gather, but each process sends into to the receive buffer of every other process. As for Reduce/Allreduce, the syntax is essentially the same as for gather/gatherv, but without a root process specified. The global result array must be created on each process for Allgather.
C++
int MPI_Allgather(void *sendbuffer, int ncount, MPI_Datatype datatype, void *recvbuffer, int ncount, MPI_Datatype datatype, MPI_Comm communicator)
Fortran
<type><dimension><size> :: vars
<type><dimension><size> :: all_vars
integer :: ncount, root, err
! more code
call MPI_ALLGATHER(vars, ncount, MPI_TYPE, all_vars, ncount, MPI_TYPE, MPI_COMM_WORLD, err)
Python
As before, for this syntax, both buffers should be NumPy arrays.
comm.Allgather([data,MPI.TYPE],[all_data,MPI.TYPE])
Exercise Modify the example gather code in your language of choice to perform an Allgather.
C++ Solution
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int ncount,nprocs,rank;
int n;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
ncount=10;
float *allvals=new float[ncount*nprocs];
cout<<setprecision(1)<<fixed;
float *myvals=new float[ncount];
// load data into local arrays
myvals[0]=(rank+1);
for (int i=1;i<ncount;++i) {
myvals[i]=myvals[i-1]+rank+1.;
}
MPI_Allgather(myvals,ncount,MPI_FLOAT,allvals,ncount,MPI_FLOAT,MPI_COMM_WORLD);
int nprow=ncount;
if (rank == 0) {
n=0;
for (int i=0;i<=nprocs-1;++i) {
for (int j=n;j<=n+nprow-1;++j) {
cout<<" "<<allvals[j]<<" ";
}
n=n+nprow;
cout<<"\n";
}
cout<<"\n";
}
MPI_Finalize();
}
Fortran Solution
program gatherthem
use mpi
implicit none
real, dimension(:), allocatable :: myvals
real, dimension(:), allocatable :: allvals
integer :: i,n,nprow
integer :: rank, ncount, nprocs, ierr
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
ncount=10
allocate(allvals(nprocs*ncount))
allocate(myvals(ncount))
! Load data into local array
myvals(1)=(rank+1)
do i=2,ncount
myvals(i)=myvals(i-1)+rank+1.
enddo
call MPI_Allgather(myvals,ncount,MPI_REAL,allvals,ncount,MPI_REAL,MPI_COMM_WORLD,ierr)
nprow=ncount
if (rank == 0) then
n=1
do i=1,nprocs
write(6,'(20f6.0)') allvals(n:n+nprow-1)
n=n+nprow
enddo
write(6,'(20f6.0)') allvals(n:)
endif
call MPI_Finalize(ierr)
end program
Python Solution
import sys
import numpy as np
from mpi4py import MPI
comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()
myvals=(rank+1)*np.linspace(1.,10.,10)
print(rank,myvals)
sendcount=myvals.size
allvals=np.zeros(sendcount*nprocs)
comm.Allgather([myvals,MPI.DOUBLE],[allvals,MPI.DOUBLE])
print(rank,allvals)
Alltoall
In MPI_Alltoall, each process sends data to every other process. Let us consider the simplest case, when each process sends one item to every other process. Suppose there are three processes and rank 0 has an array containing the values [0,1,2], rank 1 has [10,11,12], and rank 2 has [20,21,22]. Rank 0 keeps (or sends to itself) the 0 value, sends 1 to rank 1, and 2 to rank 2. Rank 1 sends 10 to rank 0, keeps 11, and sends 12 to rank 2. Rank 2 sends 20 to rank 0, 21 to rank 1, and keeps 22.
C++
alltoall.cxx
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int nprocs,rank;
int n;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
float *recvals=new float[nprocs];
cout<<setprecision(1)<<fixed;
float *myvals=new float[nprocs];
// load data into local arrays
myvals[0]=(rank+1)*100.;
for (int i=1;i<nprocs;++i) {
myvals[i]=myvals[i-1]+1.;
}
MPI_Alltoall(myvals,1,MPI_FLOAT,recvals,1,MPI_FLOAT,MPI_COMM_WORLD);
for (int n=0;n<nprocs;++n) {
MPI_Barrier(MPI_COMM_WORLD);
if (n==rank) {
for (int i=0;i<nprocs;++i) {
cout<<rank<<":"<<recvals[i]<<" ";
}
cout<<"\n";
}
}
MPI_Finalize();
}
Fortran
alltoall.f90
program everybodyskate
use mpi
implicit none
real, dimension(:), allocatable :: myvals
real, dimension(:), allocatable :: recvals
integer :: i
integer :: rank, nprocs, ierr
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
allocate(myvals(nprocs))
myvals(1)=(rank+1)*100.
do i=2,nprocs
myvals(i)=myvals(i-1)+1.
enddo
allocate(recvals(nprocs))
call MPI_Alltoall(myvals,1,MPI_REAL,recvals,1,MPI_REAL,MPI_COMM_WORLD,ierr)
call MPI_Finalize(ierr)
end program
Python
alltoall.py
import sys
import numpy as np
from mpi4py import MPI
comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()
mystart=rank*100.
myend=mystart+nprocs
myvals=np.arange(mystart,myend,1.)
# Receive buffer
recvals=np.zeros(myvals.size)
comm.Alltoall(myvals,recvals)
print(rank,recvals)
Two more general forms of alltoall exist; MPI_Alltoallv
, which is similar to MPI_Allgatherv
in that varying data sizes and displacements are allowed; and MPI_Alltoallw
, which is even more general in that the ‘chunks’ of data on the processes can be of different datatypes. These procedures are beyond our scope but the interested reader can consult the documentation.
MPI_IN_PLACE
We often do not need one buffer once the message has been communicated, and allocating two buffers wastes memory and requires some amount of unneeded communication. MPI collective procedures allow the special buffer MPI_IN_PLACE
. This special value can be used instead of the receive buffer in Scatter
and Scatterv
; in the other collective functions it takes the place of the send buffer. The expected send and receive buffers must be the same size for this to be valid. As usual for mpi2py, the Python name of the variable is MPI.IN_PLACE.
Examples
MPI_Scatter(sendbuf, ncount, MPI_Datatype, MPI_IN_PLACE, ncount, MPI_Datatype, root, MPI_COMM_WORLD);
MPI_Reduce(MPI_IN_PLACE, recvbuf, ncount, MPI_Datatype, MPI_Op, root, MPI_COMM_WORLD);
call MPI_Scatter(vals, ncount, MPI_TYPE, MPI_IN_PLACE, ncount, MPI_TYPE, root, MPI_COMM_WORLD)
call MPI_REDUCE(MPI_IN_PLACE, recvbuf, ncount, MPI_TYPE, MPI_Op, root, MPI_COMM_WORLD, ierr)
comm.Scatter([sendvals,MPI.DOUBLE],MPI.IN_PLACE,root=0)
comm.Reduce(sendarr, MPI.IN_PLACE, operation, root=0)