Message passing between several workers

As mentioned earlier, our goal is to have a structure where there are several processes constantly executing workers from a queue, and if a process finishes executing one worker, then it will pick up another. To do this, we will be utilizing a subclass of Queue called JoinableQueue, which will provide the additional task_done() and join() methods, as described in the following list:

  • task_done(): This method tells the program that the calling JoinableQueue object is complete
  • join(): This method blocks until all items in the calling JoinableQueue object have been processed

Now the goal here, again, is to have a JoinableQueue object holding all the tasks that are to be executed—we will call this the task queue—and a number of processes. As long as there are items (messages) in the task queue, the processes will take their turn to execute those items. We will also have a Queue object to store all the results returned from the processes—we will call this the result queue.

Navigate to the Chapter06/example7.py file and take a look at the Consumer class and the Task class, as shown in the following code:

# Chapter06/example7.py

from math import sqrt
import multiprocessing

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue

def run(self):
pname = self.name

while not self.task_queue.empty():

temp_task = self.task_queue.get()

print('%s processing task: %s' % (pname, temp_task))

answer = temp_task.process()
self.task_queue.task_done()
self.result_queue.put(answer)

class Task():
def __init__(self, x):
self.x = x

def process(self):
if self.x < 2:
return '%i is not a prime number.' % self.x

if self.x == 2:
return '%i is a prime number.' % self.x

if self.x % 2 == 0:
return '%i is not a prime number.' % self.x

limit = int(sqrt(self.x)) + 1
for i in range(3, limit, 2):
if self.x % i == 0:
return '%i is not a prime number.' % self.x

return '%i is a prime number.' % self.x

def __str__(self):
return 'Checking if %i is a prime or not.' % self.x

The Consumer class, which is an overridden subclass of the multiprocessing.Process class, is our processor logic, which takes in a task queue and a result queue. When started, each Consumer object will get the next item in its task queue, execute it, and finally call task_done() and put the returned result to its result queue. Each item in the task queue is in turn represented by the Task class, whose main functionality is to prime-check its parameter. As one instance of the Consumer class interacts with one instance of the Task class, it will also print out a help message for us to easily keep track of which consumer is executing which task.

Let's move on and consider our main program, as shown in the following code:

# Chapter06/example7.py

if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()

# spawning consumers with respect to the
# number cores available in the system
n_consumers = multiprocessing.cpu_count()
print('Spawning %i consumers...' % n_consumers)
consumers = [Consumer(tasks, results) for i in range(n_consumers)]
for consumer in consumers:
consumer.start()

# enqueueing jobs
my_input = [2, 36, 101, 193, 323, 513, 1327, 100000, 9999999, 433785907]
for item in my_input:
tasks.put(Task(item))

tasks.join()

for i in range(len(my_input)):
temp_result = results.get()
print('Result:', temp_result)

print('Done.')

As we said earlier, we create a task queue and a result queue in our main program. We also create a list of Consumer objects and start all of them; the number of processes created corresponds to the number of CPUs available in our system. Next, from a list of inputs that requires heavy computation from the Task class, we initialize a Task object with each input and put them all in the task queue. At this point our processes—our Consumer objects—will start executing these tasks.

Finally, at the end of our main program, we call join() on our task queue to ensure that all items have been executed and print out the result by looping through our result queue. After running the script, your output should be similar to the following:

> python example7.py
Spawning 4 consumers...
Consumer-3 processing task: Checking if 2 is a prime or not.
Consumer-2 processing task: Checking if 36 is a prime or not.
Consumer-3 processing task: Checking if 101 is a prime or not.
Consumer-2 processing task: Checking if 193 is a prime or not.
Consumer-3 processing task: Checking if 323 is a prime or not.
Consumer-2 processing task: Checking if 1327 is a prime or not.
Consumer-3 processing task: Checking if 100000 is a prime or not.
Consumer-4 processing task: Checking if 513 is a prime or not.
Consumer-3 processing task: Checking if 9999999 is a prime or not.
Consumer-2 processing task: Checking if 433785907 is a prime or not.
Result: 2 is a prime number.
Result: 36 is not a prime number.
Result: 193 is a prime number.
Result: 101 is a prime number.
Result: 323 is not a prime number.
Result: 1327 is a prime number.
Result: 100000 is not a prime number.
Result: 9999999 is not a prime number.
Result: 513 is not a prime number.
Result: 433785907 is a prime number.
Done.

Everything seems to be working, but if we look closely at the messages our processes have printed out, we will notice that most of the tasks were executed by either Consumer-2 or Consumer-3, and that Consumer-4 executed only one task while Consumer-1 failed to execute any. What happened here?

Essentially, when one of our consumers—let's say Consumer-3—finished executing a task, it tried to look for another task to execute immediately after. Most of the time, it would get priority over other consumers, since it was already being run by the main program. So while Consumer-2 and Consumer-3 were constantly finishing their tasks' executions and picking up other tasks to execute, Consumer-4 was only able to "squeeze" itself in once, and Consumer-1 failed to do this altogether.

When running the script over and over again, you will notice a similar trend: only one or two consumers executed most of the tasks, while others failed to do this. This situation is undesirable for us, since the program is not utilizing all of the available processes that were created at the beginning of the program.

To address this issue, a technique has been developed, to stop consumers from immediately taking the next item from the task queue, called poison pill. The idea is that, after setting up the real tasks in the task queue, we also add in dummy tasks that contain "stop" values and that will have the current consumer hold and allow other consumers to get the next item in the task queue first; hence the name "poison pill."

To implement this technique, we need to add in our tasks value in the main program's special objects, one per consumer. Additionally, in our Consumer class, the implementation of the logic to handle these special objects is also required. Let's take a look at the example8.py file (a modified version of the previous example, containing the implementation of the poison pill technique), specifically in the Consumer class and the main program, as shown in the following code:

# Chapter06/example8.py

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue

def run(self):
pname = self.name

while True:
temp_task = self.task_queue.get()

if temp_task is None:
print('Exiting %s...' % pname)
self.task_queue.task_done()
break

print('%s processing task: %s' % (pname, temp_task))

answer = temp_task.process()
self.task_queue.task_done()
self.result_queue.put(answer)

class Task():
def __init__(self, x):
self.x = x

def process(self):
if self.x < 2:
return '%i is not a prime number.' % self.x

if self.x == 2:
return '%i is a prime number.' % self.x

if self.x % 2 == 0:
return '%i is not a prime number.' % self.x

limit = int(sqrt(self.x)) + 1
for i in range(3, limit, 2):
if self.x % i == 0:
return '%i is not a prime number.' % self.x

return '%i is a prime number.' % self.x

def __str__(self):
return 'Checking if %i is a prime or not.' % self.x

if __name__ == '__main__':

tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()

# spawning consumers with respect to the
# number cores available in the system
n_consumers = multiprocessing.cpu_count()
print('Spawning %i consumers...' % n_consumers)
consumers = [Consumer(tasks, results) for i in range(n_consumers)]
for consumer in consumers:
consumer.start()

# enqueueing jobs
my_input = [2, 36, 101, 193, 323, 513, 1327, 100000, 9999999, 433785907]
for item in my_input:
tasks.put(Task(item))

for i in range(n_consumers):
tasks.put(None)

tasks.join()

for i in range(len(my_input)):
temp_result = results.get()
print('Result:', temp_result)

print('Done.')

The Task class remains the same as our previous example. We can see that our poison pill is the None value: in the main program, we add in None values of a number equal to the number of consumers we have spawned to the task queue; in the Consumer class, if the current task to be executed holds the value None, then the class object will print out a message indicating the poison pill, call task_done(), and exit.

Run the script; your output should be similar to the following:

> python example8.py
Spawning 4 consumers...
Consumer-1 processing task: Checking if 2 is a prime or not.
Consumer-2 processing task: Checking if 36 is a prime or not.
Consumer-3 processing task: Checking if 101 is a prime or not.
Consumer-4 processing task: Checking if 193 is a prime or not.
Consumer-1 processing task: Checking if 323 is a prime or not.
Consumer-2 processing task: Checking if 513 is a prime or not.
Consumer-3 processing task: Checking if 1327 is a prime or not.
Consumer-1 processing task: Checking if 100000 is a prime or not.
Consumer-2 processing task: Checking if 9999999 is a prime or not.
Consumer-3 processing task: Checking if 433785907 is a prime or not.
Exiting Consumer-1...
Exiting Consumer-2...
Exiting Consumer-4...
Exiting Consumer-3...
Result: 2 is a prime number.
Result: 36 is not a prime number.
Result: 323 is not a prime number.
Result: 101 is a prime number.
Result: 513 is not a prime number.
Result: 1327 is a prime number.
Result: 100000 is not a prime number.
Result: 9999999 is not a prime number.
Result: 193 is a prime number.
Result: 433785907 is a prime number.
Done.

This time, as well as seeing the poison pill messages being printed out, the output also shows a significantly better distribution in terms of which consumer executed which task.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.126.199