Parallel Programming with MPI

The most widely used general-purpose communications library for distributed parallelization is MPI, the Message Passing Interface.

MPI works on multicore systems as well as multi-node, but the programming model is still different from threads. MPI starts a specified number of independent copies of the program, which then communicate with one another through the MPI library.

In MPI each process has an ID called its rank. Ranks are numbered from 0 to n-1, for n processes. No process shares memory with any other process whether running on the same node or not. All communications occur over the network. To use MPI the programmer must manage the distribution of the data to different processes and the communication among the processes. Each process runs the same script or program, so any difference in behavior by rank must be programmed.

MPI messages are identified by an “envelope” of metadata. This consists of the destination, the source (the “return address”), a communicator (a group of processes that will be exchanging information), and optionally a tag. A communicator consisting of all processes, called COMM_WORLD, is set up when the MPI program is initiated.

The process with rank 0 is usually called the root process. Since the minimum number of processes is 1, the root process is the only one that is guaranteed to be present. For this reason it is usually used to manage various bookkeeping tasks as well as input/output.

The mpi4py Package

The most popular direct way to use MPI with Python is the mpi4py package. If you are running on your own multicore system you can install it directly with conda as usual. If you are running on an HPC cluster that uses a resource manager such as Slurm, the process is more complicated.

When installing it into your environment in an HPC cluster, you should not use conda install because conda will install precompiled binaries and you must use a version of MPI that will correctly communicate with the system network and the resource manager. We suggest creating a conda environment for your MPI programs.

conda create -n "mpienv" python=3.11

In a cluster environment, it is best to install mpi4py from the conda-forge channel following their instructions here. This will install dummies into your environment that will be replaced by the external library when the package is imported. Do not try to install both MPICH and OpenMPI; use the one most appropriate to your system. In our example, we will install mpi4py with OpenMPI.

First load a compiler. Generally you will want to use a version of gcc that is compatible with that used for your installation of Python. Running the python interpreter from the command line will show this

python
Python 3.11.6 | packaged by conda-forge | (main, Oct  3 2023, 10:40:35) [GCC 12.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>

For the above example, with the versions of gcc available on our system, this is gcc 11.4.0. Then check for available versions of OpenMPI (please use OpenMPI on UVA HPC systems) with

module spider openmpi

Now view the available external versions of OpenMPI from conda-forge

conda search -f openmpi -c conda-forge

For this example, we will use OpenMPI 4.1.4.

conda install -c conda-forge "openmpi=4.1.4=external_*"

You must use the external hook so that mpi4py will link to a version of MPI that will communicate with the cluster resource manager. Do not install openmpi directly from conda-forge.

Once this installation has completed, you can install mpi4py

conda install -c conda-forge mpi4py

MPI consists of dozens of functions, though most programmers need only a fraction of the total. The mpi4py package has implemented most of them using a “Pythonic” syntax, rather than the more C-like syntax used by other languages. One peculiarity of mpi4py is that only particular types may be communicated; in particular, only NumPy NDArrays or pickled objects are supported. To simplify our discussion, we will ignore the versions for pickled objects. The requirement that NumPy arrays be sent means that even single values must be represented as one-element NumPy arrays.

MPI requires more advanced programming skills so we will just show an example here. Our Monte Carlo pi program is well suited to MPI so we can use that. For much more information about programming with MPI in Python, as well as C++ and Fortran, please see our short course.

"""
 This program estimates the value of PI by running a Monte Carlo simulation.
 Imagining a virtual dart board of radius 1 (centered at the origin) and
 inscribed within a square of side-length 2 (centered at the origin), the
 program throws a large number of "darts" at the board and counts how many
 of the darts land within the circle.  Because we know that a circle of
 radius 1 has an area of PI, and because we know that a square of side-length
 2 has an area of 4, we can approximate the value of PI by considering that
 the ratio of the number of darts inside the circle to the number of darts
 outside the circle but in the square should be equal to the raio of PI over
 4.  Simplified, PI = 4 ! (dartsInside / dartsTotal).

 NOTE:  This is not how one would normally want to calculate PI, but serves
 to illustrate the principle.
"""

import sys
import os
import math
import random
import numpy as np
import time
from mpi4py import MPI

def pi(numPoints):
    """Throw a series of imaginary darts at an imaginary dartboard of unit 
        radius and count how many land inside the circle."""

    numInside=0
  
    for i in range(numPoints):
        x=random.random()
        y=random.random()
        if (x**2+y**2<1):
            numInside+=1

    pi=4.0*numInside/numPoints
    return pi

def main():

    if (len(sys.argv)>1):
        try:
            numPoints=int(float((sys.argv[1])))
        except:
            print("Argument must be an integer.")
    else:
        print("USAGE:python MonteCarlo.py numPoints")
        exit()

    myrank = MPI.COMM_WORLD.Get_rank()
    nprocs = MPI.COMM_WORLD.Get_size()

    chunks=numPoints%nprocs
    myNumPoints=[numPoints//nprocs+1]*chunks+[numPoints//nprocs]*(nprocs-chunks)

    tic=time.time ()
    mypi=np.ones(1)*pi(myNumPoints[myrank])
    ppi=np.zeros(1)
    MPI.COMM_WORLD.Reduce(mypi, ppi, op=MPI.SUM, root=0)
    ppi/=float(nprocs)
    toc=time.time()
    if myrank==0:
        print("Pi is approximately",ppi[0])
        print("Parallel time on "+str(nprocs)+" cores:"+str(round(toc-tic,4)))

        tic=time.time()
        spi=pi(numPoints)
        print("Pi is approximately ",spi)
        toc=time.time()
        print("Serial time:"+str(round(toc-tic,4)))

if __name__=="__main__":
    main()

The first invocation of MPI is the call to Get_rank. This returns the rank of the process that calls it. Remember that each MPI process runs as a separate executable; the only way their behaviors can be controlled individually is through the rank. This call also initializes MPI; a separate MPI.Init is not required. The next line allows us to find out how many processes are in COMM_WORLD. The number of processes for MPI programs is always set outside the program, and should never be hardcoded into the source code.

Next we divide up the number of “throws” into roughly equal chunks, just as we did in the corresponding Multiprocessing example. The same list is generated, but we use myrank to select the element of the list that each rank will use.

After this each process invokes the pi routine using myNumPoints for its rank. We need to collect all the estimates and average them. The reduction collects the results from each rank (in rank order) and applies the “op” (operation). A reduction must be a binary operation that returns another object of the same type; the operation is applied along the sequence. In this case we want the sum so we add the result from rank 0 to that from rank 1, then we take that sum and add the result from rank 2, and so forth. Dividing by the number of processes provides the average.

The Reduce function returns each result to the process regarded as the root, which would normally be 0 as it is in this case, and root carries out the operation. Thus only the root process knows the final result. We then select root to print out the value.

This example reruns the problem without distributing the throws in order to obtain a serial time for comparison purposes. Of course, a real application would not do that.

Running the MPI Python Program on an HPC System

In order to launch multiple tasks (or processes) of our program, we run this program through the MPI executor. On our HPC cluster this is srun.

srun python MonteCarloPiMPI.py 1000000000

Note: you cannot launch the MPI program with srun on the login nodes. In order to execute our program on designated compute node(s), we need to write a simple bash script that defines the compute resources we need. We call this our job script. For our example, the job script pimpi.sh looks like this:

#!/bin/bash
#SBATCH -N 1
#SBATCH --ntasks-per-node=8
#SBATCH -A hpc_training  #use a valid allocation for your login
#SBATCH -p interactive
#SBATCH -t 1:00:00

echo Running on `hostname`

#Load the versions of gcc, MPI, and Python/Anaconda in which you installed mpi4py
module load anaconda
module load gcc
module load openmpi

#Activate your environment. Edit to specify whatever name you chose.
conda activate mpienv

srun python MonteCarloPiMPI.py 100000000

The #SBATCH directives define the compute resources (-N, --ntasks-per-node, -p), compute wall time (-t), and the allocation account (--account) to be used. -N 1 specifies that all MPI tasks should run on a single node. We are limiting the number of nodes for this workshop so that everyone gets a chance to run their code on the shared resources. Be sure to edit the script to activate your environment by its correct name.

Submitting the job:

Open a terminal window. Do not load any modules. Execute this command:

sbatch pympi.slurm

Checking the job status:

Check the job status with the squeue -u or sacct commands as described in the Multiprocessing section.

Checking the output file:

Open the slurm-XXXXXX.out files in a text editor and record the total run time for the job.

Exercise

Rerun the MPI job using 1 or 4 cpu cores by changing the --ntasks-per-node option.

Scaling

On the cluster, the timings are very similar to Multiprocessing on the workstation.

CPU Cores Run Time
1 (serial) 400 sec
4 102 sec
8 60 sec
16 31 sec

Dask-MPI

Dask can use mpi4py on a high-performance cluster. First install mpi4py according to the instructions in the previous section, then pip install --user dask-mpi.

Schedulers

We have not discussed Dask schedulers previously. The scheduler is a process that managers the workers that carry out the tasks. We have been implicitly using the single-machine scheduler, which is the default. Within the single-machine scheduler are two options, threaded and processes. The threaded single-machine scheduler is the default for Dask Arrays, Dask Dataframes, and Dask Delayed. However, as we discussed with Multiprocessing, the GIL (Global Interpreter Lock) inhibits threading in general. Most of NumPy and Pandas release the GIL so threading works well with them. If you cannot use NumPy and Pandas then the processes scheduler is preferred. It is much like Multiprocessing.

To use Dask-MPI we must introduce the Dask distributed scheduler. The distributed scheduler may be preferable to processes even on a single machine, and it is required for use across multiple nodes.

Running Dask-MPI

We will discuss here only the batch interface for Dask MPI. Dask-MPI provides an initialize function that initializes MPI and sets up communications. You must then start a Client cluster to connect the workers. Dask-MPI uses rank 0 for the manager process and rank 1 to mediate between manager and workers, so you must request at least 3 processes. Close the cluster at the end of your script to avoid error messages about terminated processes.

Example Convert the “timeseries” example to Dask MPI.

import os
import dask
import dask_mpi as dm
import dask.distributed as dd
import dask.dataframe as ddf
from dask.distributed import Client

# Initialize Dask cluster and store worker files in current work directory
dm.initialize(local_directory=os.getcwd())

#Connect clients
client = Client()

df=dask.datasets.timeseries()
df2=df[df.y>0]
df3=df2.groupby('name').x.std()
df3.compute()
print("Finished")
client.close()

Run this simple example with

#!/bin/bash
#SBATCH --ntasks=4
#SBATCH -p standard
#SBATCH -A hpc_build
#SBATCH -t 00:10:00

module load gcc/11.4.0
module load openmpi
module load anaconda

export OMPI_MCA_mpi_warn_on_fork=0
srun python dask_df_mpi.py

The OMPI_MCA environment variable suppresses a warning message that is seldom relevant.

Using Dask-MPI is not difficult, especially in batch mode, but users interested in trying it should be sure to first understand the distributed scheduler, then study the online examples carefully. Dask-MPI does not require explicit calls to MPI but with the convenience comes some loss of control; the algorithm must be suited to the distributed scheduler. More information may be found in the documentation.

Full documentation for Dask-MPI is here.

Previous
Next