Global Communication in MPI: Many To Many

In many-to-many collective communications, all processes in the communicator group send a message to others.

Barrier

When MPI_Barrier is invoked, each process pauses until all processes in the communicator group have called this function. The MPI_BARRIER is used to synchronize processes. It should be used sparingly, since it “serializes” a parallel program. Most of the collective communication routines contain an implicit barrier so an explicit MPI_Barrier is not required.

C++

MPI_Barrier(MPI_Comm comm)

Fortran

call MPI_Barrier(comm, ierr)

Python

comm.Barrier()

It is seldom needed in Python. For examples in C++ and Fortran, please see

scatter.cxx

#include <iostream>
#include <iomanip>
#include "mpi.h"

using namespace std;

int main(int argc, char *argv[]) {

    int nprocs,rank;
    int sendcount, n;
    float values[100];

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    //For illustration only, don't hardcode nprocs
    if (100%nprocs !=0) {
	if (rank==0) {
            cout << "For this simple example, nprocs must evenly divide 100\n";
	}
	exit(1);
    }
    else {
        sendcount=100/nprocs;
    }

    for(int i=0;i<100;++i) {
        values[i]=i+1;
    }

    cout<<setprecision(1)<<fixed;

    float *myvals=new float[sendcount];

    MPI_Scatter(values,sendcount,MPI_FLOAT,myvals,sendcount,MPI_FLOAT,0,MPI_COMM_WORLD);

    //Forces each process to write separately and in order
    //Printing here is to demonstrate how the data are distributed
    for (n=0;n<nprocs;++n) {
        MPI_Barrier(MPI_COMM_WORLD);
        if (n==rank) {
	    cout<<rank<<":";
	    for (int i=0;i<sendcount;++i) {
		cout<<" "<<myvals[i]<<" ";
	    }
	    cout<<"\n";
	}
    }

    MPI_Finalize();

}
        


scatter.f90

program scatterthem
    use mpi
    implicit none

    real, dimension(100) :: values
    real, dimension(:), allocatable :: myvals
    integer              :: rank, nprocs, ierr
    integer              :: sendcount, i, n

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)

    !For illustration only, don't hardcode nprocs
    if (mod(100,nprocs)/=0) then
        if (rank==0) then
            print*, "For this simple example, nprocs must evenly divide 100"
        endif
        call MPI_Finalize(ierr)
        stop 
    endif

    ! Only root has to know the global values
    if (rank==0) then
        do i=1,100
            values(i)=i
        enddo
    endif

    sendcount=100/nprocs
    allocate(myvals(sendcount))

    call MPI_Scatter(values,sendcount,MPI_REAL,myvals,sendcount,MPI_REAL,0,MPI_COMM_WORLD,ierr)

    !Forces each process to write separately and in order
    !Printing here is to demonstrate how the data are distributed
    do n=0,nprocs-1
        call MPI_Barrier(MPI_COMM_WORLD,ierr)
        if (n==rank) then
            write(6,'(i4,a)',advance='no') rank,":"
            do i=1,size(myvals)-1
                write(6,'(f5.0)',advance='no') myvals(i)
            enddo
            write(6,'(f5.0)') myvals(size(myvals))
        endif
    enddo

    call MPI_Finalize(ierr)

end program

In these examples, it is used in a loop to force the output from the processes to be separated distinctly and in rank order. Upon entry into the loop, all processes synchronize before executing the loop body. The process whose rank matches the loop variable writes its output, while the other processes skip back to the top of the loop. However, they must wait there until the process doing the writing finishes and invokes MPI_Barrier.

Exercise

Write a program that generates an array of values from 1 to 10 only on the root process. Broadcast the array to each process. Print the array from each process.

Test it with four processes on the frontend or on your workstation.

C++

#include <iostream>
#include <iomanip>
#include "mpi.h"

using namespace std;

int main(int argc, char *argv[]) {

    int nprocs,rank;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    int *values=new int[10];
    for (int i=1; i<=10; ++i){
        values[i-1]=i;
    }

    MPI_Bcast(values,10,MPI_INT,0,MPI_COMM_WORLD);

    cout<<rank<<" "<<values[0];
    for (int i=1; i<9; ++i) {
       cout<<" "<<values[i];
    }
    cout<<" "<<values[9]<<endl;

    MPI_Finalize();

}
        


Fortran

program broadcaster
    use mpi
    implicit none

    integer           :: nprocs, rank, ierr
    integer, dimension(:), allocatable :: values
    integer           :: i

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)

    allocate(values(10))

    if ( rank==0 ) then
        values=[(i,i=1,10)]
    endif

    call MPI_Bcast(values,size(values),MPI_INTEGER,0,MPI_COMM_WORLD,ierr)

    write(*,'(11i5)'), rank, values

    call MPI_Finalize(ierr)

end program 

Python

import sys
import numpy as np
from mpi4py import MPI

comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()

#Must declare space in all ranks.  Make sure the dtypes match.
values=np.empty(10,dtype='int')

if rank==0:
    values=np.arange(1,11,1,dtype='int')

comm.Bcast([values,MPI.INT])
print(rank,values)

Allreduce

As the examples in the previous chapter demonstrated, when MPI_Reduce is called, only the root process knows the result. This can lead to bugs since it is fairly common that all processes should be aware of the global value. The MPI_Allreduce procedure performs the reduction and distributes the result. It should always be used rather than a Reduce followed by a Bcast, since the implementation should carry this out more efficiently than with an explicit broadcast.

The syntax for MPI_Allreduce is identical to that of MPI_Reduce but with the root number omitted.

int MPI_Allreduce(void *operand, void *result, int ncount, MPI_Datatype type, MPI_Op operator, MPI_Comm comm );
call MPI_ALLREDUCE(sendbuf, recvbuf, ncount, datatype, op, comm, ierr)
comm.Allreduce(sendarr, recvarr, operation)

Exercise Modify the example reduction code in your language of choice to perform an Allreduce.

C++ Solution

#include <iostream>
#include <iomanip>
#include "mpi.h"

using namespace std;

int main(int argc, char *argv[]) {

    int ncount,nprocs,rank;
    int n;
    float total;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    ncount=100;

    cout<<setprecision(1)<<fixed;

    float *myvals=new float[ncount];
    // load data into local arrays
    
    myvals[0]=rank+1;
    for (int i=1;i<ncount;++i) {
	myvals[i]=myvals[i-1]+rank+1.;
    }

    float mysum=0.0;
    for (int i=0;i<ncount;++i) {
	mysum+=myvals[i];
    }

    MPI_Allreduce(&mysum,&total,1,MPI_FLOAT,MPI_SUM,MPI_COMM_WORLD);

    for (int n=0;n<nprocs;++n) {
	MPI_Barrier(MPI_COMM_WORLD);
	if (n==rank) {
            cout<<rank<<":"<<mysum<<" "<<total<<endl;
	}
    }

    MPI_Finalize();

}
        


Fortran Solution

program reduction
    use mpi
    implicit none

    real, dimension(:), allocatable :: myvals
    real                 :: mysum, total
    integer              :: i,n,nprow
    integer              :: rank, ncount, nprocs, ierr

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)

    ncount=100
        
    allocate(myvals(ncount))
    ! Load data into local array
    myvals(1)=1
    do i=2,ncount
        myvals(i)=myvals(i-1)+1.
    enddo
    myvals=(rank+1)*myvals

    !Local sum
    mysum=sum(myvals)

    call MPI_Allreduce(mysum,total,1,MPI_REAL,MPI_SUM,MPI_COMM_WORLD,ierr)
    
    print*,rank,mysum,total

    call MPI_Finalize(ierr)

end program
        


Python Solution

import sys
import numpy as np
from mpi4py import MPI

comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()

num=100
myvals=(rank+1)*np.linspace(1.,num,int(num))

#Add the numbers locally
mysum=myvals.sum()

#Get the grand total
total=np.zeros(1)

comm.Allreduce(mysum,total,op=MPI.SUM)

print(rank,mysum,total)

Allgather/Allgatherv

An allgather is the same as a gather, but each process sends into to the receive buffer of every other process. As for Reduce/Allreduce, the syntax is essentially the same as for gather/gatherv, but without a root process specified. The global result array must be created on each process for Allgather.

C++

int MPI_Allgather(void *sendbuffer, int ncount, MPI_Datatype datatype, void *recvbuffer, int ncount, MPI_Datatype datatype, MPI_Comm communicator)

Fortran

<type><dimension><size>   :: vars
<type><dimension><size>   :: all_vars
integer  :: ncount, root, err
! more code
call MPI_ALLGATHER(vars, ncount, MPI_TYPE, all_vars, ncount, MPI_TYPE, MPI_COMM_WORLD, err)

Python

As before, for this syntax, both buffers should be NumPy arrays.

comm.Allgather([data,MPI.TYPE],[all_data,MPI.TYPE])

Exercise Modify the example gather code in your language of choice to perform an Allgather.

C++ Solution

#include <iostream>
#include <iomanip>
#include "mpi.h"

using namespace std;

int main(int argc, char *argv[]) {

    int ncount,nprocs,rank;
    int n;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    ncount=10;
    float *allvals=new float[ncount*nprocs];

    cout<<setprecision(1)<<fixed;

    float *myvals=new float[ncount];
    // load data into local arrays
    
    myvals[0]=(rank+1);
    for (int i=1;i<ncount;++i) {
	myvals[i]=myvals[i-1]+rank+1.;
    }

    MPI_Allgather(myvals,ncount,MPI_FLOAT,allvals,ncount,MPI_FLOAT,MPI_COMM_WORLD);

    int nprow=ncount;
    if (rank == 0) {
        n=0;
        for (int i=0;i<=nprocs-1;++i) {
	    for (int j=n;j<=n+nprow-1;++j) {
	        cout<<" "<<allvals[j]<<" ";
            }
           n=n+nprow;
	   cout<<"\n";
	}
	cout<<"\n";
    }

    MPI_Finalize();

}
        


Fortran Solution

program gatherthem
    use mpi
    implicit none

    real, dimension(:), allocatable :: myvals
    real, dimension(:), allocatable :: allvals
    integer              :: i,n,nprow
    integer              :: rank, ncount, nprocs, ierr

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)

    ncount=10

    allocate(allvals(nprocs*ncount))
        
    allocate(myvals(ncount))
    ! Load data into local array
    myvals(1)=(rank+1)
    do i=2,ncount
        myvals(i)=myvals(i-1)+rank+1.
    enddo

    call MPI_Allgather(myvals,ncount,MPI_REAL,allvals,ncount,MPI_REAL,MPI_COMM_WORLD,ierr)

    nprow=ncount
    if (rank == 0) then
        n=1
        do i=1,nprocs
           write(6,'(20f6.0)') allvals(n:n+nprow-1)
           n=n+nprow
        enddo
        write(6,'(20f6.0)') allvals(n:)
    endif

    call MPI_Finalize(ierr)

end program
        


Python Solution

import sys
import numpy as np
from mpi4py import MPI

comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()

myvals=(rank+1)*np.linspace(1.,10.,10)
print(rank,myvals)
sendcount=myvals.size

allvals=np.zeros(sendcount*nprocs)
comm.Allgather([myvals,MPI.DOUBLE],[allvals,MPI.DOUBLE])

print(rank,allvals)

Alltoall

In MPI_Alltoall, each process sends data to every other process. Let us consider the simplest case, when each process sends one item to every other process. Suppose there are three processes and rank 0 has an array containing the values [0,1,2], rank 1 has [10,11,12], and rank 2 has [20,21,22]. Rank 0 keeps (or sends to itself) the 0 value, sends 1 to rank 1, and 2 to rank 2. Rank 1 sends 10 to rank 0, keeps 11, and sends 12 to rank 2. Rank 2 sends 20 to rank 0, 21 to rank 1, and keeps 22.

Alltoall. Note that as depicted, the values in the columns are transposed to values as rows.

C++

alltoall.cxx

#include <iostream>
#include <iomanip>
#include "mpi.h"

using namespace std;

int main(int argc, char *argv[]) {

    int nprocs,rank;
    int n;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    float *recvals=new float[nprocs];

    cout<<setprecision(1)<<fixed;

    float *myvals=new float[nprocs];
    // load data into local arrays
    
    myvals[0]=(rank+1)*100.;
    for (int i=1;i<nprocs;++i) {
	myvals[i]=myvals[i-1]+1.;
    }

    MPI_Alltoall(myvals,1,MPI_FLOAT,recvals,1,MPI_FLOAT,MPI_COMM_WORLD);

    for (int n=0;n<nprocs;++n) {
	MPI_Barrier(MPI_COMM_WORLD);
	if (n==rank) {
            for (int i=0;i<nprocs;++i) {
                cout<<rank<<":"<<recvals[i]<<" ";
            }
            cout<<"\n";
	 }
    }

    MPI_Finalize();

}
        


Fortran

alltoall.f90

program everybodyskate
    use mpi
    implicit none

    real, dimension(:), allocatable :: myvals
    real, dimension(:), allocatable :: recvals
    integer              :: i
    integer              :: rank, nprocs, ierr

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)

    allocate(myvals(nprocs))

    myvals(1)=(rank+1)*100.
    do i=2,nprocs
        myvals(i)=myvals(i-1)+1.
    enddo

    allocate(recvals(nprocs))
    call MPI_Alltoall(myvals,1,MPI_REAL,recvals,1,MPI_REAL,MPI_COMM_WORLD,ierr)

    call MPI_Finalize(ierr)

end program

Python

alltoall.py

import sys
import numpy as np
from mpi4py import MPI

comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()

mystart=rank*100.
myend=mystart+nprocs
myvals=np.arange(mystart,myend,1.)

# Receive buffer
recvals=np.zeros(myvals.size)

comm.Alltoall(myvals,recvals)

print(rank,recvals)

Two more general forms of alltoall exist; MPI_Alltoallv, which is similar to MPI_Allgatherv in that varying data sizes and displacements are allowed; and MPI_Alltoallw, which is even more general in that the ‘chunks’ of data on the processes can be of different datatypes. These procedures are beyond our scope but the interested reader can consult the documentation.

MPI_IN_PLACE

We often do not need one buffer once the message has been communicated, and allocating two buffers wastes memory and requires some amount of unneeded communication. MPI collective procedures allow the special buffer MPI_IN_PLACE. This special value can be used instead of the receive buffer in Scatter and Scatterv; in the other collective functions it takes the place of the send buffer. The expected send and receive buffers must be the same size for this to be valid. As usual for mpi2py, the Python name of the variable is MPI.IN_PLACE.

Examples

MPI_Scatter(sendbuf, ncount, MPI_Datatype, MPI_IN_PLACE, ncount, MPI_Datatype, root, MPI_COMM_WORLD);

MPI_Reduce(MPI_IN_PLACE, recvbuf, ncount, MPI_Datatype, MPI_Op, root, MPI_COMM_WORLD);
call MPI_Scatter(vals, ncount, MPI_TYPE, MPI_IN_PLACE, ncount, MPI_TYPE, root, MPI_COMM_WORLD)

call MPI_REDUCE(MPI_IN_PLACE, recvbuf, ncount, MPI_TYPE, MPI_Op, root, MPI_COMM_WORLD, ierr)
comm.Scatter([sendvals,MPI.DOUBLE],MPI.IN_PLACE,root=0)

comm.Reduce(sendarr, MPI.IN_PLACE, operation, root=0)
Previous
Next