So far, everything we've run was run on one CPU, sequentially—with the exception of some ML models and transformations, which support the number of jobs (parallel executors); for example, cKDTree supports multiprocessing, if needed.
The caveat here is the overhead—in order to run a multicore process, a lot of additional memory needs to be allocated and data needs to be copied; it is essentially a fixed cost. Because of that, most of the tasks we ran wouldn't benefit from multiple cores, except for cases where data is very large and computations are fairly parallelized. On the flip side, once we run a task on multiple cores, spreading it across multiple machines is simple.
Most of the time, using multiple cores—or multiple machines—will not boost the computations you're able to run on the local machine, loading all the data in memory. However, if your data is big enough, you have to use chunks, and computation will take hours to run. Due to this, using distributed computation could be your only choice (obviously, assuming the bottleneck is not bad code).
For that, we need to introduce Dask—a system that allows you to run heavy computations with big datasets on multiple cores of one machine, or on a cluster of machines. The best part (for us) of Dask is that it emulates the behavior of Pandas or NumPy on the surface. In many cases, Dask's dataframe can be used as if it was a Pandas dataframe—except that it is spread across cores and machines. One big difference in using Dask is that no computation is executed until you ask it to compute.
Let's try pulling the same data we were using for 311 predictions. Since Dask is meant to be used with large datasets and multiple files, it can handle path patterns—we don't need to glob explicitly (also, it can glob on the S3 bucket and read from there). To do so, we'll import the dask dataframe, specify a path, pattern, using an asterisk (wildcard) to identify parts of the path that vary. Finally, we will use the read_csv method to read those, just like we'd do with pandas (we do this because Dask runs Pandas' read_csv method under the hood here). Setting blocksize to None here explicitly makes Dask use one worker (core) per file. We also explicitly set Dask to use processes (multicore) scheduler. Here's what this looks like in code:
from dask import dataframe as dd
import dask
dask.config.set(scheduler='processes')
As you will notice, the code won't take long to execute—this is because it didn't actually run anything. For now, df is just a schedule object that will execute once we call a compute() method. Let's continue coding as if it was a dataframe:
df = df[df.complaint_type.str.lower().str.contains('noise')]
cols = ['x_coordinate_state_plane', 'y_coordinate_state_plane', 'created_date', 'closed_date', 'complaint_type', 'open_data_channel_type']
df = dg.dropna(subset=cols)
X = df[['x_coordinate_state_plane', 'y_coordinate_state_plane']]
X['dow'] = df['created_date'].dt.dayofweek
X['hour'] = df['created_date'].dt.dayofweek
X['doy'] = df['created_date'].dt.dayofyear
Like before, the code didn't take long to execute—for the same reason. The tasks are combining, though, and are forming a directed graph. We can cross-check that graph as follows:
X.visualize(filename='chart.png')
This is what we'll get:
Here, each separate graph represents a chunk that could live on a separate CPU, while each node represents an operation. It is very useful to cross-check those graphs, especially for complex operations (think groupby and similar). Once you're ready, hit compute:
data = X.compute()
The best part is that while the preceding code will execute on the local machine, it is easy to deploy a cluster of machines on the cloud. Once that is done, Dask can be configured to spread your computation to those machines, with no changes needing to be made to the code on Dask's side (obviously, it will benefit from having data stored in storage that's accessible to all the machines in the cluster).
For more information on Dask, take a look at Scalable Data Analysis with Dask, by Mohammed Kashif: https://www.packtpub.com/web-development/scalable-data-analysis-python-dask-video