Collective Communication in MPI: Many To One
In many-to-one collective communications, all processes in the communicator group send a message to root. The buffer is read from the senders and written on the root.
Gather
A gather is the inverse of a scatter. Each process sends ncount items to root, which assembles them in rank order. The buffer in the root process must be large enough to accommodate all the data. As for a scatter, the MPI_Datatype is specified twice but must be the same each time. Note that ncount
is the number of items sent per process, not the total.
C++
The prototype is
int MPI_Gather(void *sendbuffer,int ncount,MPI_Datatype datatype,void *recvbuffer,int ncount,MPI_Datatype datatype,int root,MPI_Comm communicator)
C++ Example
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int ncount,nprocs,rank;
int n;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
ncount=10;
float *allvals=new float[ncount*nprocs];
cout<<setprecision(1)<<fixed;
float *myvals=new float[ncount];
// load data into local arrays
myvals[0]=(rank+1);
for (int i=1;i<ncount;++i) {
myvals[i]=myvals[i-1]+rank+1.;
}
MPI_Gather(myvals,ncount,MPI_FLOAT,allvals,ncount,MPI_FLOAT,0,MPI_COMM_WORLD);
int nprow=ncount;
if (rank == 0) {
n=0;
for (int i=0;i<=nprocs-1;++i) {
for (int j=n;j<=n+nprow-1;++j) {
cout<<" "<<allvals[j]<<" ";
}
n=n+nprow;
cout<<"\n";
}
cout<<"\n";
}
MPI_Finalize();
}
Fortran
<type><dimension><size> :: vars
<type><dimension><size> :: all_vars
integer :: ncount, root, err
! more code
call MPI_Gather(vars, ncount, MPI_TYPE, all_vars, ncount, MPI_TYPE, root, MPI_COMM_WORLD, err)
Fortran Example
program gatherthem
use mpi
implicit none
real, dimension(:), allocatable :: myvals
real, dimension(:), allocatable :: allvals
integer :: i,n,nprow
integer :: rank, ncount, nprocs, ierr
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
ncount=10
if (rank==0) then
allocate(allvals(nprocs*ncount))
endif
allocate(myvals(ncount))
! Load data into local array
myvals(1)=(rank+1)
do i=2,ncount
myvals(i)=myvals(i-1)+rank+1.
enddo
call MPI_Gather(myvals,ncount,MPI_REAL,allvals,ncount,MPI_REAL,0,MPI_COMM_WORLD,ierr)
nprow=ncount
if (rank == 0) then
n=1
do i=1,nprocs
write(6,'(20f6.0)') allvals(n:n+nprow-1)
n=n+nprow
enddo
write(6,'(20f6.0)') allvals(n:)
endif
call MPI_Finalize(ierr)
end program
Python
For this syntax, both buffers should be NumPy arrays.
comm.Gather([data,MPI.TYPE],[all_data,MPI.TYPE], root=0)
Python Example
import sys
import numpy as np
from mpi4py import MPI
comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()
myvals=(rank+1)*np.linspace(1.,10.,10)
print(rank,myvals)
sendcount=myvals.size
allvals=np.zeros(sendcount*nprocs)
comm.Gather([myvals,MPI.DOUBLE],[allvals,MPI.DOUBLE])
if (rank==0):
print(allvals)
Gatherv
MPI_Gatherv is the inverse of MPI_Scatterv. Chunks of data are distributed into a global array at the root process. For C++ and Fortran, sendcount is an integer, but recvcounts and displs are integer arrays.
C/C++
int MPI_Gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, int root, MPI_Comm comm);
C++ Example
#include <iostream>
#include <iomanip>
#include "mpi.h"
using namespace std;
int main(int argc, char *argv[]) {
int nprocs,rank;
int n;
float values[101], allvals[101];
int displs[8];
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
//For illustration only, don't hardcode nprocs
if (nprocs != 8) {
cout << "Example requires 8 processes\n";
exit(1);
}
for (int i=0;i<=100;++i) {
values[i]=(float) i;
allvals[i]=0.;
}
cout<<setprecision(1)<<fixed;
//Hand-distributing the numbers
int sendcounts[]={12,12,11,12,13,9,10,8};
int offsets[]={0,2,3,1,4,1,1,2};
displs[0]=offsets[0];
for (int i=1;i<nprocs;++i) {
displs[i]=displs[i-1]+sendcounts[i-1]+offsets[i];
}
float *myvals=new float[sendcounts[rank]];
// load data into local arrays
int start_index=displs[rank];
for (int i=0;i<sendcounts[rank];++i) {
myvals[i]=values[start_index+i];
}
MPI_Gatherv(myvals,sendcounts[rank],MPI_FLOAT,allvals,sendcounts,displs,MPI_FLOAT,0,MPI_COMM_WORLD);
int nprow=101/nprocs;
int leftover=101%nprocs;
if (rank == 0) {
n=0;
for (int i=0;i<nprocs;++i) {
for (int j=n;j<=n+nprow-1;++j) {
cout<<" "<<allvals[j]<<" ";
}
n=n+nprow;
cout<<"\n";
}
for (int i=n;i<=n+leftover-1;++i) {
cout<<allvals[i]<<" ";
}
cout<<"\n";
}
MPI_Finalize();
}
Fortran
call MPI_GATHERV(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm, ierr)
Fortran Example
program gatherthem
use mpi
implicit none
real, dimension(101) :: values, allvals
real, dimension(:), allocatable :: myvals
integer, dimension(8):: sendcounts,offsets,displs
integer :: i,n,nprow
integer :: start_index,end_index
integer :: rank, nprocs, ierr
call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
!For illustration only, don't hardcode nprocs
if (nprocs /= 8) then
stop "Example requires 8 processes"
endif
do i=1,101
values(i)=i-1
enddo
allvals=0.0
!Hand-distributing the numbers
sendcounts=[12,12,11,12,13,9,10,8]
offsets=[0,2,3,1,4,1,1,2]
displs(1)=offsets(1)
do i=2,nprocs
displs(i)=displs(i-1)+sendcounts(i-1)+offsets(i)
enddo
allocate(myvals(sendcounts(rank+1)))
! Load data into local array
start_index=displs(rank+1)+1
end_index=displs(rank+1)+sendcounts(rank+1)
myvals=values(start_index:end_index)
call MPI_Gatherv(myvals,sendcounts(rank+1),MPI_REAL,allvals,sendcounts,displs,MPI_REAL,0,MPI_COMM_WORLD,ierr)
nprow=101/nprocs
if (rank == 0) then
n=1
do i=1,nprocs
write(6,'(20f6.0)') allvals(n:n+nprow-1)
n=n+nprow
enddo
write(6,'(20f6.0)') allvals(n:)
endif
call MPI_Finalize(ierr)
end program
Python
#sendcounts and displ are integer ndarrays
comm.Gatherv([sendbuf,[sendcounts,displ,MPI.TYPE],recvbuf)
As for comm.Scatterv, the MPI.TYPE is generally required.
Python Example
import sys
import numpy as np
from mpi4py import MPI
comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()
#For illustration only
if nprocs != 8:
print("Example requires 8 processes")
sys.exit()
values=np.linspace(0.,100.,101)
#Hand-distributing numbers
sendcounts=np.array([12,12,11,12,13,9,10,8])
offsets=np.array([0,2,3,1,4,1,1,2])
displs=np.zeros(nprocs,dtype=int)
displs[0]=offsets[0]
for i in range(1,nprocs):
displs[i]=displs[i-1]+sendcounts[i-1]+offsets[i]
start=np.zeros(nprocs,dtype=int)
end =np.zeros(nprocs,dtype=int)
# Index into values
start_index=displs[rank]
end_index=displs[rank]+sendcounts[rank]
myvals=values[start_index:end_index]
print(rank,myvals)
# Receive buffer
allvals=np.zeros(101)
comm.Gatherv(myvals,[allvals,sendcounts,displs,MPI.DOUBLE])
if (rank==0):
print(allvals)