Python for Parallel Computing

This chapter covers parallel computing and the module mpi4py. Complex and time-consuming computational tasks can often be divided into subtasks, which can be carried out simultaneously if there is capacity for it. When these subtasks are independent of each other, executing them in parallel can be especially efficient. Situations where subtasks have to wait until another subtask is completed are less suited for parallel computing.

Consider the task of computing an integral of a function by a quadrature rule:

with . If the evaluation of  is time-consuming and  is large , it would be advantageous to split the problem into two or several subtasks of smaller size:

                                                       

We can use several computers and give each of them the necessary information so that they can perform their subtasks, or we can use a single computer with a so-called multicore architecture.

Once the subtasks are accomplished the results are communicated to the computer or processor that controls the entire process and performs the final additions.

We will use this as a guiding example in this chapter while covering the following topics:

  • Multicore computers and computer clusters
  • Message passing interface (MPI)

18.1 Multicore computers and computer clusters

Most of the modern computers are multicore computers. For example, the laptop used when writing this book has an Intel® i7-8565U processor that has four cores with two threads each.

What does this mean? Four cores on a processor allow performing four computational tasks in parallel. Four cores with two threads each are often counted as eight CPUs by system monitors. For the purposes of this chapter only the number of cores matters.

These cores share a common memory—the RAM of your laptop—and have individual memory in the form of cache memory:

Figure 18.1: A multicore architecture with shared and local cache memory

The cache memory is used optimally by its core and is accessed at high speed, while the shared memory can be accessed by all cores of one CPU. On top, there is the computer's RAM memory and finally, the hard disk, which is also shared memory.

In the next section, we will see how a computational task can be distributed to individual cores and how results are received and further processed, for example, being stored in a file.

A different setting for parallel computing is the use of a computer cluster. Here, a task is divided into parallelizable subtasks that are sent to different computers, sometimes even over long distances. Here, communication time can matter substantially. The use of such a computer cluster makes sense only if the time for processing subtasks is large in relation to communication time.

18.2 Message passing interface (MPI)

Programming for several cores or on a computer cluster with distributed memory requires special techniques. We describe here message passing and related tools standardized by the MPI standard. These tools are similar in different programming languages, such as C, C++, and FORTRAN, and are realized in Python by the module mpi4py.

18.2.1 Prerequisites

You need to install this module first by executing the following in a terminal window:

conda install mpi4py

The module is imported by adding the following line to your Python script:

import mpi4py as mpi

The execution of a parallelized code is done from a terminal with the command mpiexec. Assuming that your code is stored in the file script.py, executing this code on a computer with a four-core CPU is done in the terminal window by running the following command:

mpiexec -n 4 python script.py

Alternatively, to execute the same script on a cluster with two computers, run the following in a terminal window:

mpiexec --hostfile=hosts.txt python script.py

You have to provide a file hosts.txt containing the names or IP addresses of the computers with the number of their cores you want to bind to a cluster:

# Content of hosts.txt
192.168.1.25 :4 # master computer with 4 cores
192.168.1.101:2 # worker computer with 2 cores

The Python script, here script.py, has to be copied to all computers in the cluster.

18.3 Distributing tasks to different cores

When executed on a multicore computer, we can think of it that mpiexec copies the given Python script to the number of cores and runs each copy. As an example, consider the one-liner script print_me.py with the command print("Hello it's me"), that, when executed with mpiexec -n 4 print_me.py, generates the same message on the screen four times, each sent from a different core.

In order to be able to execute different tasks on different cores, we have to be able to distinguish these cores in the script.

To this end, we create a so-called communicator instance, which organizes the communication between the world, that is, the input and output units like the screen, the keyboard, or a file, and the individual cores. Furthermore, the individual cores are given identifying numbers, called a rank:

from mpi4py import MPI
comm=MPI.COMM_WORLD # making a communicator instance
rank=comm.Get_rank() # querrying for the numeric identifyer of the core
size=comm.Get_size() # the total number of cores assigned

The communicator attribute size refers to the total number of processes specified in the statement mpiexec.

Now we can give every core an individual computational task, as in the next script, which we might call basicoperations.py:


from mpi4py import MPI
comm=MPI.COMM_WORLD # making a communicator instance
rank=comm.Get_rank() # querrying for the numeric identifyer of the core
size=comm.Get_size() # the total number of cores assigned
a=15
b=2
if rank==0:
print(f'Core {rank} computes {a}+{b}={a+b}')
if rank==1:
print(f'Core {rank} computes {a}*{b}={a*b}')
if rank==2:
print(f'Core {rank} computes {a}**{b}={a**b}')

This script is executed in the terminal by entering the following command:

mpiexec -n 3 python basicoperations.py

We obtain three messages:

Core 0 computes 15+2=17
Core 2 computes 15**2=225
Core 1 computes 15*2=3

All three processes got their individual tasks, which were executed in parallel. Clearly, printing the result to the screen is a bottleneck as the screen is shared by all three processes.

In the next section, we see how communication between the processes is done.

18.3.1 Information exchange between processes

There are different ways to send and receive information between processes:

  • Point-to-point communication
  • One-to-all and all-to-one
  • All-to-all

In this section, we will introduce point-to-point, one-to-all, and all-to-one communication.

Speaking to a neighbor and letting information pass along a street this way is an example from daily life of the first communication type from the preceding list, while the second can be illustrated by the daily news, spoken by one person and broadcast to a big group of listeners.One-to-all and all-to-one communication

                    

Figure 18.2: Point-to-point communication and one-to-all communication

In the next subsections, we will study these different communication types in a computational context.

18.3.2 Point-to-point communication

Point-to-point communication directs information flow from one process to a designated receiving process. We first describe the methods and features by considering a ping-pong situation and a telephone-chain situation and explain the notion of blocking.

Point-to-point communication is applied in scientific computing, for instance in random-walk or particle-tracing applications on domains that are divided into a number of subdomains corresponding to the number of processes that can be carried out in parallel.

The ping-pong example assumes that we have two processors sending an integer back and forth to each other and increasing its value by one.

We start by creating a communicator object and checking that we have two processes available:

from mpi4py import MPI
comm=MPI.COMM_WORLD # making a communicator instance
rank=comm.Get_rank() # querying for the numeric identifier of the core
size=comm.Get_size() # the total number of cores assigned
if not (size==2):
raise Exception(f"This examples requires two processes.
{size} processes given.")

Then we send information back and forth between the two processes:

count = 0
text=['Ping','Pong']
print(f"Rank {rank} activities: ==================")
while count < 5:
if rank == count%2:
print(f"In round {count}: Rank {rank} says {text[count%2]}""
"and sends the ball to rank {(rank+1)%2}")
count += 1
comm.send(count, dest=(rank+1)%2)
elif rank == (count+1)%2:
count = comm.recv(source=(rank+1)%2)

Information is sent by the method send of the communicator. Here, we provided it with the information that we want to send, along with the destination. The communicator takes care that the destination information is translated to a hardware address; either one core of the CPU in your machine or that of a host machine.

The other machine receives the information by the communicator method comm.recv. It requires information on where the information is expected from. Under the hood, it tells the sender that the information has been received by freeing the information buffer on the data channel. The sender awaits this signal before it can proceed.

The two statements if rank == count%2 and elif rank == (count+1)%2 ensure that the processors alternate their sending and receiving tasks.

Here is the output of this short script that we saved in a file called pingpong.py and executed with the following:

mpiexec -n 2 python pingpong.py 

In the terminal, this produces the following output:

Rank 0 activities:
==================
In round 0: Rank 0 says Ping and sends the ball to rank 1
In round 2: Rank 0 says Ping and sends the ball to rank 1
In round 4: Rank 0 says Ping and sends the ball to rank 1
Rank 1 activities:
==================
In round 1: Rank 1 says Pong and sends the ball to rank 0
In round 3: Rank 1 says Pong and sends the ball to rank 0

What types of data can be sent or received? As the commands send and recv communicate data in binary form, they pickle the data first (see Section 14.3: Pickling). Most of the Python objects can be pickled, but not lambda functions for instance. It is also possible to pickle buffered data such as NumPy arrays, but a direct send of buffered data is more efficient, as we'll see in the next subsection.

Note that there might be reasons for sending and receiving functions between processes. As the methods send and recv  only communicate references to functions, the references have to exist on the sending and receiving processors. Therefore the following Python script returns an error:

from mpi4py import MPI
comm=MPI.COMM_WORLD # making a communicator instance
rank=comm.Get_rank() # querying for the numeric identifier of the core
size=comm.Get_size() # the total number of cores assigned

if rank==0:
def func():
return 'Function called'
comm.send(func, dest=1)
if rank==1:
f=comm.recv(source=0) # <<<<<< This line reports an error
print(f())One-to-all and all-to-one communication

The error message thrown by the statement recv is AttributeError: Can't get attribute 'func'. This is caused by the fact that f refers to the function func, which is not defined for the processor with rank 1. The correct way is to define this function for both processors:

from mpi4py import MPI
comm=MPI.COMM_WORLD # making a communicator instance
rank=comm.Get_rank() # querying for the numeric identifier of the core
size=comm.Get_size() # the total number of cores assigned

def func():
return 'Function called'
if rank==0:
comm.send(func, dest=1)
if rank==1:
f=comm.recv(source=0)
print(f())

18.3.3 Sending NumPy arrays

The commands send and recv are high-level commands. That means they do under-the-hood work that saves the programmer time and avoids possible errors. They allocate memory after having internally deduced the datatype and the amount of buffer data needed for communication. This is done internally on a lower level based on C constructions.

NumPy arrays are objects that themselves make use of these C-buffer-like objects, so when sending and receiving NumPy arrays you can gain efficiency by using them in the lower-level communication counterparts Send and Recv (mind the capitalization!).

In the following example, we send an array from one processor to another:

 

from mpi4py import MPI
comm=MPI.COMM_WORLD # making a communicator instance
rank=comm.Get_rank() # querying for the numeric identifier of the core
size=comm.Get_size() # the total number of cores assigned
import numpy as np

if rank==0:
A = np.arange(700)
comm.Send(A, dest=1)
if rank==1:
A = np.empty(700, dtype=int) # This is needed for memory allocation
# of the buffer on Processor 1
comm.Recv(A, source=0) # Note, the difference to recv in
# providing the data.
print(f'An array received with last element {A[-1]}')

It is important to note, that on both processors, memory for the buffer has to be allocated. Here, this is done by creating on Processor 0 an array with the data and on Processor 1 an array with the same size and datatype but arbitrary data.

Also, we see a difference in the command recv in the output. The command Recv returns the buffer via the first argument. This is possible as NumPy arrays are mutable.

18.3.4 Blocking and non-blocking communication

The commands send and recv and their buffer counterparts Send and Recv are so-called blocking commands. That means a command send is completed when the corresponding send buffer is freed. When this will happen depends on several factors such as the particular communication architecture of your system and the amount of data that is to be communicated. Finally, the command send is considered to be freed when the corresponding command recv has got all the information. Without such a command recv, it will wait forever. This is called a deadlock situation.

The following script demonstrates a situation with the potential for deadlock. Both processes send simultaneously. If the amount of data to be communicated is too big to be stored the command send is waiting for a corresponding recv to empty the pipe, but recv never is invoked due to the waiting state. That's a deadlock.

from mpi4py import MPI
comm=MPI.COMM_WORLD # making a communicator instance
rank=comm.Get_rank() # querrying for the numeric identifier of the core
size=comm.Get_size() # the total number of cores assigned


if rank==0:
msg=['Message from rank 0',list(range(101000))]
comm.send(msg, dest=1)
print(f'Process {rank} sent its message')
s=comm.recv(source=1)
print(f'I am rank {rank} and got a {s[0]} with a list of
length {len(s[1])}')
if rank==1:
msg=['Message from rank 1',list(range(-101000,1))]
comm.send(msg,dest=0)
print(f'Process {rank} sent its message')
s=comm.recv(source=0)
print(f'I am rank {rank} and got a {s[0]} with a list of
length {len(s[1])}')

Note that executing this code might not cause a deadlock on your computer as the amount of data communicated is very small.

The straightforward remedy to avoid a deadlock, in this case, is to swap the order of the commands recv and send on one of the processors:

from mpi4py import MPI
comm=MPI.COMM_WORLD # making a communicator instance
rank=comm.Get_rank() # querrying for the numeric identifier of the core
size=comm.Get_size() # the total number of cores assigned


if rank==0:
msg=['Message from rank 0',list(range(101000))]
comm.send(msg, dest=1)
print(f'Process {rank} sent its message')
s=comm.recv(source=1)
print(f'I am rank {rank} and got a {s[0]} with a list of
length {len(s[1])}')
if rank==1:
s=comm.recv(source=0)
print(f'I am rank {rank} and got a {s[0]} with a list of
length {len(s[1])}')
msg=['Message from rank 1',list(range(-101000,1))]
comm.send(msg,dest=0)
print(f'Process {rank} sent its message')
print(f'I am rank {rank} and got a {s[0]} with a list of
length {len(s[1])}')

18.3.5 One-to-all and all-to-one communication

When a complex task depending on a larger amount of data is divided into subtasks, the data also has to be divided into portions relevant to the related subtask and the results have to be assembled and processed into a final result.

Let's consider as an example the scalar product of two vectors  divided into subtasks:

                                  

with  All subtasks perform the same operations on portions of the initial data, the results have to be summed up, and possibly any remaining operations have to be carried out.

We have to perform the following steps:

  1. Creating the vectors u and v
  2. Dividing them into m subvectors with a balanced number of elements, that is,  elements if N is divisible by m, otherwise some subvectors have more elements
  3. Communicating each subvector to "its" processor
  4. Performing the scalar product on the subvectors on each processor
  5. Gathering all results
  6. Summing up the results

Steps 1, 2, and 6 are run on one processor, the so-called root processor. In the following example code, we choose the processor with rank 0 for these tasks. Steps 3, 4, and 5 are executed on all processors, including the root processor. For the communication in Step 3, mpi4py provides the command scatter, and for recollecting the results the command gather is available.

Preparing the data for communication

First, we will look into Step 2. It is a nice exercise to write a script that splits a vector into m pieces with a balanced number of elements. Here is one suggestion for such a script, among many others:

def split_array(vector, n_processors):
# splits an array into a number of subarrays
# vector one dimensional ndarray or a list
# n_processors integer, the number of subarrays to be formed

n=len(vector)
n_portions, rest = divmod(n,n_processors) # division with remainder
# get the amount of data per processor and distribute the res on
# the first processors so that the load is more or less equally
# distributed
# Construction of the indexes needed for the splitting
counts = [0]+ [n_portions + 1
if p < rest else n_portions for p in range(n_processors)]
counts=numpy.cumsum(counts)
start_end=zip(counts[:-1],counts[1:]) # a generator
slice_list=(slice(*sl) for sl in start_end) # a generator comprehension
return [vector[sl] for sl in slice_list] # a list of subarrays

As this chapter is one of the last in this book we have seen a lot of tools that can be used for this code. We worked with NumPy's cumulative sum, cumsum. We used the generator zip, unpacking arguments by the operator *, and generator comprehension. We also tacitly introduced the data type slice, which allows us to do the splitting step in the last line in a very compact way.

The commands – scatter and gather

Now we are ready to look at the entire script for our demo problem, the scalar product:

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocessors = comm.Get_size()
import splitarray as spa

if rank == 0:
# Here we generate data for the example
n = 150
u = 0.1*np.arange(n)
v = - u
u_split = spa.split_array(u, nprocessors)
v_split = spa.split_array(v, nprocessors)
else:
# On all processor we need variables with these names,
# otherwise we would get an Exception "Variable not defined" in
# the scatter command below
u_split = None
v_split = None
# These commands run now on all processors
u_split = comm.scatter(u_split, root=0) # the data is portion wise
# distributed from root
v_split = comm.scatter(v_split, root=0)
# Each processor computes its part of the scalar product
partial_dot = u_split@v_split
# Each processor reports its result back to the root
partial_dot = comm.gather(partial_dot,root=0)

if rank==0:
# partial_dot is a list of all collected results
total_dot=np.sum(partial_dot)
print(f'The parallel scalar product of u and v'
f'on {nprocessors} processors is {total_dot}. '
f'The difference to the serial computation is
{abs(total_dot-u@v)}')

If this script is stored in a file parallel_dot.py the command for execution with five processors is the following:

mexec -n 5 python parallel_dot.py

The result in this case is as follows:

The parallel scalar product of u and v on 5 processors is -11137.75.
The difference to the serial computation is 0.0

This example demonstrates the use of scatter to send out specific information to each processor. To use this command the root processor has to provide a list with as many elements as available processors. Each element contains the data to be communicated to one of the processors including the root processor.

The reversing process is gather. When all processors completed this command the root processor is provided with a list with as many elements as available processors, each containing the resulting data of its corresponding processor.

In the final step, the root processor again works alone by postprocessing this result list. The example above it sums all list elements and displays the result.

The art of parallel programming is to avoid bottlenecks. Ideally, all processors should be busy and should start and stop simultaneously. That is why the workload is distributed more or less equally to the processors by the script splitarray that we described previously. Furthermore, the code should be organized in such a way that the start and end periods with the root processor working alone are short compared to the computationally intense part carried out by all processors simultaneously.

A final data reduction operation – the command reduce

The parallel scalar product example is typical for many other tasks in the way how results are handled: the amount of data coming from all processors is reduced to a single number in the last step. Here, the root processor sums up all partial results from the processors. The command reduce can be efficiently used for this task. We modify the preceding code by letting reduce do the gathering and summation in one step. Here, the last lines of the preceding code are modified in this way:

......... modification of the script above .....
# Each processor reports its result back to the root
# and these results are summed up
total_dot = comm.reduce(partial_dot, op=MPI.SUM, root=0)

if rank==0:
print(f'The parallel scalar product of u and v'
f' on {nprocessors} processors is {total_dot}. '
f'The difference to the serial computation
is {abs(total_dot-u@v)}')

Other frequently applied reducing operations are:

  • MPI.MAX or MPI.MIN: The maximum or minimum of the partial results
  • MPI.MAXLOC or MPI.MINLOC: The argmax or argmin of the partial results
  • MPI.PROD: The product of the partial results
  • MPI.LAND or MPI.LOR: The logical and/logical or of the partial results

Sending the same message to all

Another collective command is the broadcasting command bcast. In contrast to scatter it is used to send the same data to all processors. Its call is similar to that of scatter:

data = comm.bcast(data, root=0)

but it is the total data and not a list of portioned data that is sent. Again, the root processor can be any processor. It is the processor that prepares the data to be broadcasted.

Buffered data

In an analogous manner, mpi4py provides the corresponding collective commands for buffer-like data such as NumPy arrays by capitalizing the command: scatter/Scatter, gather/Gather, reduce/Reduce, bcast/Bcast.

18.4 Summary

In this chapter, we saw how to execute copies of the same script on different processors in parallel. Message passing allows the communication between these different processes. We saw point-to-point communication and the two different distribution type collective communications one-to-all and all-to-one. The commands presented in this chapter are provided by the Python module mpi4py, which is a Python wrapper to realize the MPI standard in C.

Having worked through this chapter, you are now able to work on your own scripts for parallel programming and you will find that we described only the most essential commands and concepts here. Grouping processes and tagging information are only two of those concepts that we left out. Many of these concepts are important for special and challenging applications, which are far too particular for this introduction.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.142.146