Time to get into action! Let's solve our case study involving the Fibonacci series for multiple inputs using PP in the SMP architecture. I am using a notebook armed with a two-core processor and four threads.
We will import only two modules for this implementation, os
and pp
. The os
module will be used only to obtain a PID of the processes in execution. We will have a list called input_list
with the values to be calculated and a dictionary to group the results, which we will call result_dict
. Then, we go to the chunk of code as follows:
import os, pp input_list = [4, 3, 8, 6, 10] result_dict = {}
Then, we define a function called fibo_task
, which will be executed by parallel processes. It will be our func
argument passed by the submit
method of the Server
class. The function does not feature major changes in relation to previous chapters, except that the return is now done by using a tuple to encapsulate the value received in the argument and a message containing a PID and a calculated Fibonacci term. Take a look at the following complete function:
def fibo_task(value): a, b = 0, 1 for item in range(value): a, b = b, a + b message = "the fibonacci calculated by pid %d was %d" % (os.getpid(), a) return (value, message)
The next step is to define our callback
function, which we will call aggregate_results
. The callback
function will be called as soon as the fibo_task
function returns the result of its execution. Its implementation is quite simple and only shows a status message, generating afterwards an input in result_dict
, containing as a key the value passed to the fibo_dict
function, and as a result, the message returned by the process that calculated the Fibonacci term. The following code is the complete implementation of the aggregate_results
function:
def aggregate_results(result): print "Computing results with PID [%d]" % os.getpid() result_dict[result[0]] = result[1]
Now, we have two functions to be defined. We have to create an instance of the Server
class to dispatch the tasks. The following line of code creates an instance of Server
:
job_server = pp.Server()
In the preceding example, we used standard values for arguments. In the next section, we will make use of some available arguments.
Now that we have an instance of the Server
class, let's iterate each value of our input_list
, dispatching the fibo_task
function through the submit
call, passing as arguments to the input value in the args
tuple the module that needs to be imported so that the function is executed correctly and callback
registers aggregate_results
. Refer to the following chunk of code:
for item in input_list: job_server.submit(fibo_task, (item,), modules=('os',), callback=aggregate_results)
Finally, we have to wait till the end of all the dispatched tasks. Therefore, we can use the wait
method of the Server
class as follows:
job_server.wait()
In the end, we will iterate the results of the printing entries through our dictionary as follows:
print "Main process PID [%d]" % os.getpid() for key, value in result_dict.items(): print "For input %d, %s" % (key, value)
The following screenshot illustrates the output of the program:
18.190.160.221