Multiprocess pooling

One of the coolest features of multiprocessing libraries is pooling. This lets us distribute the tasks evenly across all the processor cores, without having to worry about the number of processes that are run actively at one time. This implies that this module has the ability to spawn a group of processes in a batch. Let's say that we define the batch size as 4, which is the number of processor cores we may have. This means that, at any time, the maximum number of processes that can be executed is four and if one of the processes completes its execution, meaning we now have three running processes, the module automatically picks the next set of processes to make the batch size equal to four again. The process will continue until we either finish our distributed task or we explicitly define a condition.

Take a look at the following example, where we are required to write 8 million records in eight different files (1 million records in each file). We have a four-core processor to carry out this task. Ideally, we need to spawn a batch of four processes twice, so that each process writes 1 million records in the file. Since we have four cores, we want each core to carry out a different part of our task. If we choose to spawn eight processes together, we would waste some time in context switching, so we need to use our processor and processing capabilities wisely to get the maximum throughput:

In the preceding code Multiprocess_pool.py, we are creating a multiprocessing pool at line 30. We define the size of the pool as size=mp.cpu_count(), which in our case is 4, so we are defining a pool of size four. We need to create eight files, each holding 1 million records. We use a for loop to define eight processes that would be sent to the pool object by invoking apply_async() on the created pool object. The apply_async() method expects the name of the method that we wish to execute as a process with multiprocessing module as an argument. The second argument is the parameters that are passed to the method that we wish to execute. Note that the process, when it gets executed with the pool module, also has the capability to return data from the method.

As can be seen from the output, at no time are there more than four processes being executed simultaneously. It can also be verified that the first process to finish is Forkpoolworker4. When the batch size is 3, another process is immediately spawned by the module. This can be verified by the output, which states Started process Poolworker4 on the sixth line of section (1) .

Note that two batches are executed in parallel. Each process took 13 to 14 seconds, but since they executed in parallel, one on each core, the overall batch execution time for each batch was 14 seconds. For two batches, therefore, the total time was 28 seconds. It can be clearly seen that by using parallelism, we solved our problem in a mere 28 seconds. If we had gone for a sequential or thread approach, the total time would have been close to (13*8) = 104 seconds. Try it yourself as an exercise.

Now let's take another example, to show another dimension of the power of the pool module. Let's say that as a part of our requirements, we need to parse four of the 8 million files that are created, those whose ID %1700 yields a zero. We must then combine the results across all the four files in a different file. This is a very good example of distributed processing and aggregation of results: the processes should not only read the files in parallel, they must also aggregate the results as well. It is somewhat similar to Hadoop's map-reduce problem. In a typical map-reduce problem, there are two sets of operations:

  • Map: This involves splitting a huge dataset across various nodes in a distributed system. Each node processes the chunk of data it receives.
  • Reduce: This is the aggregation operation, where the output of the map phase from each node is returned, and, depending on the logic, the results are finally aggregated and given back.

We are doing the same thing here, the only difference being that we are using processor cores in place of the nodes:

As can be seen in the preceding code snippet, with the help of the map() method of the Pool module, we can make multiple processes work on different files in parallel and then combine all the results and send them as a single structure. The processes are executed in parallel and the records for which the record_id %1700 returned us a zero are returned to us. Finally, we save the aggregated result in the Modulo_1700_agg file. This is a very powerful feature of the multiprocessing module, and can reduce the processing time and aggregation time by huge margin, if used properly.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.240.142