Global Communication in MPI: Reduction

In a reduction, data are sent to a root process, which performs a specified binary operation on sequential pairs of data.

Ignoring details of implementation, which as described would be very inefficient, consider this example. We want to compute the sum of all the local values of some variable over all ranks in the communicator group. The root process, assumed to be rank 0 for our case, has a value in its buffer. It receives another value from rank 1, adds it to the 0 value, and stores the result into a “global” buffer. Next it receives the value from rank 2. It adds this value to the contents of the global buffer. The value from rank 3 is then received and added to the global buffer variable. This continues until all ranks have sent their value to the root process.

The standard MPI Datatypes that we have already seen are used for the operations that return a single type of value. If the buffer is an array, MPI_Reduce will perform the operation elementwise.

The built-in operations are the same for all languages.

operation MPI_Op
bit-wise and MPI_BAND
bit-wise or MPI_BOR
bit-wise exclusive or MPI_BXOR
logical and MPI_LAND
logical or MPI_LOR
logical exclusive or MPI_LXOR
sum MPI_SUM
product MPI_PROD
maximum MPI_MAX
minimum MPI_MIN
maximum and its index MPI_MAXLOC
minimum and its index MPI_MINLOC

The operations in Python mpi4py are the same, but with a period rather than an underscore, e.g. MPI.SUM.

MPI_MAXLOC and MPI_MINLOC require some special handling. They return both the value and the rank into the global variable. The MPI_Datatypes for these two operations are particular to the language. For more details and examples see the documentation at the MPI Forum.

C/C++

int MPI_Reduce(void *operand, void *result, int count, MPI_Datatype type, MPI_Op operator, int root, MPI_Comm comm);
C++ Example

#include <iostream>
#include <iomanip>
#include "mpi.h"

using namespace std;

int main(int argc, char *argv[]) {

    int ncount,nprocs,rank;
    int n;
    float total;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    ncount=100;

    cout<<setprecision(1)<<fixed;

    float *myvals=new float[ncount];
    // load data into local arrays
    
    myvals[0]=rank+1;
    for (int i=1;i<ncount;++i) {
	myvals[i]=myvals[i-1]+rank+1.;
    }

    float mysum=0.0;
    for (int i=0;i<ncount;++i) {
	mysum+=myvals[i];
    }

    MPI_Reduce(&mysum,&total,1,MPI_FLOAT,MPI_SUM,0,MPI_COMM_WORLD);

    for (int n=0;n<nprocs;++n) {
	MPI_Barrier(MPI_COMM_WORLD);
	if (n==rank) {
            cout<<rank<<":"<<mysum<<" "<<total<<endl;
	}
    }

    MPI_Finalize();

}
        


The special predefined types used for MPI_MAXLOC and MPI_MINLOC in C/C++ assume a struct has been defined, with the first member’s type as indicated and the second an int.

Types (value, index) MPI_Datatype
float, int MPI_FLOAT_INT
double, int MPI_DOUBLE_INT
long, int MPI_LONG_INT
int, int MPI_2INT
short, int MPI_SHORT_INT
long double, int MPI_LONG_DOUBLE_INT

Fortran

MPI_REDUCE(sendbuf, recvbuf, count, datatype, op, root, comm, ierr)
Fortran Example

program reduction
    use mpi
    implicit none

    real, dimension(:), allocatable :: myvals
    real                 :: mysum, total
    integer              :: i,n,nprow
    integer              :: rank, ncount, nprocs, ierr

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD,nprocs,ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)

    ncount=100
        
    allocate(myvals(ncount))
    ! Load data into local array
    myvals(1)=1
    do i=2,ncount
        myvals(i)=myvals(i-1)+1.
    enddo
    myvals=(rank+1)*myvals

    !Local sum
    mysum=sum(myvals)

    call MPI_Reduce(mysum,total,1,MPI_REAL,MPI_SUM,0,MPI_COMM_WORLD,ierr)
    
    print*,rank,mysum,total

    call MPI_Finalize(ierr)

end program
        


For MPI_MAXLOC and MPI_MINLOC, Fortran derived types are not accommodated at this time, so an array with two elements of the same type must be used. The index (the second element) can be coerced to an integer if necessary.

Types (value, index) MPI_Datatype
real MPI_2REAL
double precision MPI_2DOUBLE_PRECISION
integer MPI_2INTEGER

Python

Note that the lower-case ‘reduce’ handles pickled objects; use the title-case ‘Reduce’ with NumPy arrays.

comm.Reduce(sendarr, recvarr, operation, root=0)
Python Example

import sys
import numpy as np
from mpi4py import MPI

comm=MPI.COMM_WORLD
nprocs=comm.Get_size()
rank=comm.Get_rank()

num=100
myvals=(rank+1)*np.linspace(1.,num,int(num))

#Add the numbers locally
mysum=myvals.sum()

#Get the grand total
total=np.zeros(1)

comm.Reduce(mysum,total,op=MPI.SUM)

print(rank,mysum,total)

The special datatypes for MAXLOC and MINLOC in mpi4py are the same as for C, but with the underscore replaced by a period as usual (MPI.FLOAT_INT).

Exercise

Return to the random-walk program with MPI. Add an appropriate reduction and print only the overall result.

Explanation

The purpose of parallelizing the random-walk code was to run a large number of trials in order to obtain a more accurate estimate of the final distance. Without the reduction, we would have collected the results and averaged them. We can use the reduction to accomplish this automatically.

C++

/* Solves a random-walk problem by a brute-force method.
 *
*/

#include <iostream>
#include <string>
#include <sstream>
#include <random>
#include <cmath>
#include <mpi.h>

using namespace std;

int main(int argc, char **argv) {

   random_device rd;
   mt19937 rng(rd());
   uniform_int_distribution<int> choice(1,4);

   int npes, rank;
   double total;

   MPI_Init(&argc, &argv);
   MPI_Comm_size(MPI_COMM_WORLD, &npes);
   MPI_Comm_rank(MPI_COMM_WORLD, &rank);

   long long N;

   if (argc != 2) {
      cout<<rank<<":0,0,0\n";
      MPI_Finalize();
      return 0;
   }
   else {
      string steps=argv[1];
      stringstream ssteps;
      ssteps<<steps;
      ssteps>>N;
   }

   double x=0.;
   double y=0.;

   int direction;

   for (long long i=1; i<=N; ++i) {
      direction=choice(rng);
      switch (direction) {
         case 1: 
           x+=1.;
           break;
         case 2: 
           x-=1.;
           break;
         case 3: 
           y+=1.;
           break;
         case 4: 
           y-=1.;
           break;
      }
   }

   double eucDist=sqrt(x*x+y*y);

   MPI_Reduce(&eucDist,&total,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD);
   if (rank==0) {
       cout<<N<<","<<sqrt((double)N)<<","<<total/npes<<"\n";
   }

   MPI_Finalize();

   return 0;

}

Fortran

module random
  implicit none

  ! Author:  Katherine Holcomb
  ! Shortened version of a longer module
  ! Module incorporated into this file for convenience; typically it would be in its 
  ! own file.
  ! Comment out one or the other when the module is incorporated into a code.

  ! Single precision
  !integer, parameter   :: rk = kind(1.0)
  ! Double precision
  integer, parameter   :: rk = kind(1.0d0)

  contains
     subroutine set_random_seed(seed)
       ! Sets all elements of the seed array
       integer, optional, intent(in)       :: seed

       integer                             :: isize
       integer, dimension(:), allocatable  :: iseed
       integer, dimension(8)               :: idate
       integer                             :: icount, i

       call random_seed(size=isize)

       if ( .not. present(seed) ) then
          call system_clock(icount) 
          allocate(iseed(isize),source=icount+370*[(i,i=0,isize-1)]) 
       else
          allocate(iseed(isize))
          iseed=seed
       endif

       call random_seed(put=iseed)

     end subroutine set_random_seed

     function urand(lb,ub,seed)
       ! Returns a uniformly-distributed random number in the range lb to ub.
       real(rk)                        :: urand
       real(rk), optional, intent(in)  :: lb,ub
       real(rk), optional, intent(in)  :: seed

       integer                         :: iseed
       real(rk)                        :: rnd
       real(rk)                        :: lower,upper

       if ( present(seed) ) then
          iseed=int(seed)
          call set_random_seed(iseed)
       endif

       if ( present(lb) ) then
          lower=lb
       else
          lower=0.0_rk
       endif

       if ( present(ub) ) then
          upper = ub
       else
          upper = 1.0_rk
       endif

       call random_number(rnd)
       urand = lower+(upper-lower)*rnd

       return
     end function urand

     function randint(n,m)
       ! Returns a random integer between n and m.  
       integer                 :: randint
       integer, intent(in)     :: n,m

       randint=ceiling(urand(real(n-1,rk),real(m,rk)))

     end function randint

end module random

program random_walk
use random
use mpi
implicit none

  integer, parameter   :: ik = selected_int_kind(15)
  integer              :: nargs
  character(len=10)    :: ns,nexp
  integer              :: m, step
  integer(ik)          :: n, nsteps
  real, dimension(2)   :: pos, move
  real                 :: distance, total

  integer              :: rank, npes, ierr

  call MPI_Init(ierr)
  call MPI_Comm_size(MPI_COMM_WORLD,npes,ierr)
  call MPI_Comm_rank(MPI_COMM_WORLD,rank,ierr)
  
  nargs=command_argument_count()
  if ( nargs .ne. 1 ) then
     write(*,*) rank,":", 0, 0., 0.
     call MPI_Finalize(ierr)
     stop
  else
     call get_command_argument(1,ns)
     read(ns,'(i10)') nsteps
  endif

  call set_random_seed()

  pos=[0.,0.]

  do n=1,nsteps
     step=randint(1,4)
     if (step==1) then
        move=[0.,1.]
     else if (step==2) then
        move=[0.,-1.]
     else if (step==3) then
        move=[1.,0.]
     else
        move=[-1.,0.]
     endif
     pos=pos+move
  enddo

  distance=sqrt(pos(1)**2+pos(2)**2)

  call MPI_Reduce(distance,total,1,MPI_REAL,MPI_SUM,0,MPI_COMM_WORLD,ierr)

  if ( rank==0 ) then
     write(*,*) nsteps, sqrt(real(nsteps)), total/npes
  endif

  call MPI_Finalize(ierr)

end program

Python

import numpy as np
import random
import sys
import numpy as np
from mpi4py import MPI

comm=MPI.COMM_WORLD
myrank=comm.Get_rank()
nprocs=comm.Get_size()

if len(sys.argv)>1:
   Nsteps=int(sys.argv[1])
else:
   print(0, 0., 0.)
   sys.exit()

pos=np.array([0,0])

moves=[np.array([0,1]),np.array([0,-1]),np.array([1,0]),np.array([-1,0])]

for n in range(Nsteps):
    move=random.choice(moves)
    pos+=move

distance=np.sqrt(pos[0]**2+pos[1]**2)
total=np.zeros(1)
comm.Reduce(distance,total,op=MPI.SUM)

if myrank==0:
   print(Nsteps, np.sqrt(Nsteps), total/nprocs)

MPI.Finalize()

Previous
Next