Dask

So far, everything we've run was run on one CPU, sequentiallywith the exception of some ML models and transformations, which support the number of jobs (parallel executors); for example, cKDTree supports multiprocessing, if needed.

The caveat here is the overheadin order to run a multicore process, a lot of additional memory needs to be allocated and data needs to be copied; it is essentially a fixed cost. Because of that, most of the tasks we ran wouldn't benefit from multiple cores, except for cases where data is very large and computations are fairly parallelized. On the flip side, once we run a task on multiple cores, spreading it across multiple machines is simple.

While the most typical task for Dask to deal with is heavy computation on multiple cores or machines, it also allows you to run computations nicely on data that wouldn't fit in a computer's memory (by loading and operating chunks of data, one per core, at a time). Thus, in theory, it can be used to run some analysis on small IoT devicesespecially given that it also supports streaming.

Most of the time, using multiple coresor multiple machineswill not boost the computations you're able to run on the local machine, loading all the data in memory. However, if your data is big enough, you have to use chunks, and computation will take hours to run. Due to this, using distributed computation could be your only choice (obviously, assuming the bottleneck is not bad code).

For that, we need to introduce Daska system that allows you to run heavy computations with big datasets on multiple cores of one machine, or on a cluster of machines. The best part (for us) of Dask is that it emulates the behavior of Pandas or NumPy on the surface. In many cases, Dask's dataframe can be used as if it was a Pandas dataframeexcept that it is spread across cores and machines. One big difference in using Dask is that no computation is executed until you ask it to compute.

Let's try pulling the same data we were using for 311 predictions. Since Dask is meant to be used with large datasets and multiple files, it can handle path patternswe don't need to glob explicitly (also, it can glob on the S3 bucket and read from there). To do so, we'll import the dask dataframe, specify a path, pattern, using an asterisk (wildcard) to identify parts of the path that vary. Finally, we will use the read_csv method to read those, just like we'd do with pandas (we do this because Dask runs Pandas' read_csv method under the hood here). Setting blocksize to None here explicitly makes Dask use one worker (core) per file. We also explicitly set Dask to use processes (multicore) scheduler. Here's what this looks like in code:

from dask import dataframe as dd
import dask
dask.config.set(scheduler='processes')

As you will notice, the code won't take long to executethis is because it didn't actually run anything. For now, df is just a schedule object that will execute once we call a compute() method. Let's continue coding as if it was a dataframe:

df = df[df.complaint_type.str.lower().str.contains('noise')]
cols = ['x_coordinate_state_plane', 'y_coordinate_state_plane', 'created_date', 'closed_date', 'complaint_type', 'open_data_channel_type']

df = dg.dropna(subset=cols)

X = df[['x_coordinate_state_plane', 'y_coordinate_state_plane']]
X['dow'] = df['created_date'].dt.dayofweek
X['hour'] = df['created_date'].dt.dayofweek
X['doy'] = df['created_date'].dt.dayofyear

Like before, the code didn't take long to executefor the same reason. The tasks are combining, though, and are forming a directed graph. We can cross-check that graph as follows:

X.visualize(filename='chart.png')

This is what we'll get:

Here, each separate graph represents a chunk that could live on a separate CPU, while each node represents an operation. It is very useful to cross-check those graphs, especially for complex operations (think groupby and similar). Once you're ready, hit compute:

data = X.compute()

The best part is that while the preceding code will execute on the local machine, it is easy to deploy a cluster of machines on the cloud. Once that is done, Dask can be configured to spread your computation to those machines, with no changes needing to be made to the code on Dask's side (obviously, it will benefit from having data stored in storage that's accessible to all the machines in the cluster).

Dask is a Python-based framework for big data computation. Its more famous alternative is Spark, and the PySpark package for Python. Spark is a great tool and can scale easily. At the same time, the core code of this technology is written in Java, and so you'll have to be prepared to debug Java code. Dask, on the other hand, is 100% Python and has familiar APIs, so you won't need to change that much code.

For more information on Dask, take a look at Scalable Data Analysis with Dask, by Mohammed Kashif: https://www.packtpub.com/web-development/scalable-data-analysis-python-dask-video

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.105.83