© The Author(s), under exclusive license to APress Media, LLC, part of Springer Nature 2022
T. SarkarProductive and Efficient Data Science with Pythonhttps://doi.org/10.1007/978-1-4842-8121-5_9

9. Scalable Data Science

Tirthajyoti Sarkar1  
(1)
Fremont, CA, USA
 

Data science tasks may encounter a wide variety of dataset sizes, ranging from kilobytes to petabytes. Some business spreadsheets will only have a few hundred rows while a whole factory may send a deluge of sensor data to a single dataset, resulting in billions of rows per day or even per hour. Some datasets can have many rows and a small number of columns, while others may consist of a few rows but millions of columns as feature dimensions. Even within the same organization or a data science team there can be multiple pipelines dealing with different types of input, and they may be facing a wide variation in the dataset size and complexity.

It is often a natural practice for data scientists to build a scaled prototype of a data science job (such as combining data wrangling, ML algorithms, and some prediction functions). They build such a prototype, test it with a typical dataset that is expected to hit the pipeline, evaluate the result or measure some performance metric with a few ML algorithms, tune them, and finally make a choice. This is an experimental mentality, and it serves the spirit of doing science with data very well. However, to support this quick analysis and prototyping, a data scientist must be able to quickly scale across a wide variety of dataset sizes and complexity as the need arises. They should not run into issues like being out of memory while prototyping on their laptop.

This chapter talks about the common problems and limitations that arise while scaling out to larger datasets and what tools are out there to address these issues. Specifically, you will visit some of the limitations that arise while doing analysis with large datasets using the most common data analysis library, Python pandas, and explore two alternative libraries or add-ons that can be used to overcome these limitations.

In fact, scalability is closely related to the ability to do parallel processing of large data. Therefore, this theme will be continued in the next chapter where you will explore Python libraries that support parallel processing natively for data science tasks.

Common Problems for Scalability

Python is a great language for data science. Libraries like pandas open myriad possibilities for data scientists to slice and dice the data any way they like and create meaningful insights and high-impact analytics reports with a relatively small amount of programming. However, they have some serious limitations when it comes to dealing with large datasets even one as simple as a CSV file with a billion rows.

Two of the most common issues that a data scientist may encounter as the scale of the data grows are out-of-core failures and inefficiencies related to the Python single-threading characteristic.

Out-of-core really means the inability to load the full data properly in the working memory (RAM) of the machine. Single threading is related to the fundamental Python design feature of the Global Interpreter Lock (GIL) (https://realpython.com/python-gil/) that allows a single thread to put a lock on the interpreter so that other threads cannot get a hold of it. Together, they can make doing efficient data analysis on large datasets (anything larger than a few gigabytes) with limited hardware quite tricky.

Out-of-Core (a.k.a. Out of Memory)

pandas is the most popular data analysis library in Python, and it is at the front end of any standard data science pipeline. However, if you have ever tried to work with data files larger than a few GB, you may have seen the memory error that is thrown by pandas (Figure 9-1).
Figure 9-1

A memory error thrown by pandas

Of course, this error depends on the exact state of the system memory such as how many other processes are running alongside the pandas code and what type of memory they are blocking. Nonetheless, it is a well-known fact that pandas cannot handle multi-GB datasets (no matter how simple in structure they may be) efficiently.

Furthermore, this inefficiency and limitation can rear its ugly head even with a large dataset that could be somehow loaded in the memory without any memory error at the beginning. Due to the way pandas handles in-memory objects and calculations, it is quite easy to run into the same memory error in your data science code. This can be exacerbated by code that produces many large in-memory DataFrames in quick succession with intermediate calculations.

For example, imagine what the following code can do. Let’s assume that the Large-file.csv has 10 million rows and 20 columns.
df1 = pd.read_csv("Large-file.csv") # Successful
df2 = df1.dropna()
df3 = df2[df2['col1'] > 10 and df2['col2'] < 20]
def complex_calc(x):
    # Some complex math
df3['new-col'] = df3['col3'].apply(complex_calc)
def some_transformation(df):
    # Transformation code
    return transformed_df
df4 = some_transformation(df3)
...
This is a generic code snippet, but you get the idea that this code is inefficient, particularly when dealing with large pandas DataFrame objects. It produces multiple intermediate DataFrames and does not purge them from memory when their job is done. At the end, it may use only the final DataFrame for a machine learning modeling, but the system memory is already clogged with so many useless objects that it will result in a memory error and the whole pipeline will crash. This is illustrated in Figure 9-2.
Figure 9-2

A memory error produced by too many intermediate DataFrames (bad coding practice) even when a large file could be read from the disk

Of course, one way to get around this issue is to rigorously maintain a good coding habit where unused objects are tracked and purged regularly. However, while doing prototyping on their Jupyter notebooks, data scientists are bound to write quick and dirty code without following this practice, and this will hinder their scalability options with large datasets.

Python Single Threading

The GIL was one of the earlier design choices in the Python language and it solved quite a few important problems related to memory leaks and racing conditions. Put simply, it is a locking mechanism that allows only one thread to hold the control of the Python interpreter. This means that only one thread can be in a state of execution at any given point in time.

Generally, its impact isn’t visible to programmers executing single-threaded programs. In fact, many data science tasks can run just fine without worrying about GIL as they execute a series of tasks one after another and do not employ many parallel processing tricks. However, it can become a performance bottleneck in CPU-bound and multi-threaded code.

For larger datasets, sometimes it makes sense to divide the data into multiple chunks and utilize a parallel processing execution pipeline. The idea is to send the chunked data to each core of the CPU and execute the analysis as much in parallel as possible. When the executions are done, the results can be combined to get back a transformed dataset. While this does not necessarily help to fit a larger dataset in memory, it can make analysis of the same dataset faster by the parallel execution.

The beauty here is that this approach can speed up data science exploration and prototyping tasks even without paying for large CPU clusters on the cloud. It is really a matter of taking advantage of the 8 or 16 cores that routinely come with the single modern-day CPU inside a data scientist’s laptop. However, you must make sure that the data science code and libraries are not getting in the way, and that you are using libraries that can take full advantage of the multi-core hardware platform.

What Options Are Out There?

To solve the memory issue (while loading and transforming large datasets) there are many possible solutions depending on the situation you are in. Some are related to your choice of hardware and some have to do with your data loading strategy. Let’s talk about them in a systematic manner.

Cloud Instances

For larger and larger datasets, there is always a brute-force solution of renting out a cloud instance with a large RAM attached. As an example, these days you can rent out an AWS (Amazon Web Service) Elastic Compute (EC2) instance with 128GB of RAM for less than a dollar per hour. Figure 9-3 shows the pricing for a r6g.4xlarge instance (a so-called memory optimized EC2 instance, www.amazonaws.cn/en/ec2/instance-types/#Memory_Optimized_Instances).
Figure 9-3

A memory-optimized EC2 (AWS) instance pricing

Once set up, you can install all your favorite Python libraries, read large data files stored locally (e.g., to a mapped SSD) or from an AWS S3 folder, and do pandas data transformation without worrying about memory errors. While it may still seem expensive to a causal user, organizations or teams who need that much memory to process pandas DataFrames regularly probably won’t mind paying ~$0.8 an hour for a smooth and error-free data science task flow.

However, remember that pandas will still be limited to use only one CPU core at a time and, by default, it will exhibit slowness while loading and dealing with large files. Just running run-of-the-mill pandas code on a large-memory cloud instance may stop some frequent memory error situations, but it may not fundamentally make the data science pipeline productive or efficient at scale.

What is a memory-optimized EC2 instance?

A cloud service like AWS must cater to a wide variety of users with various needs. Someone may need fast processing with a CPU cluster, someone else may need a high network bandwidth, and someone else may require large on-board memory (RAM). Memory-optimized instances are just that: they provide a large amount of RAM at an optimized cost. They do not necessarily have the best-in-class CPUs or network bandwidth, but they work best for jobs that demand large slices of physical memory during execution. Within these instances, there are multiple choices depending on cost and available CPU types. The r6g.4xlarge is really the starting point of this lineup that goes up to a 768GB memory option with a reasonable hourly cost.

AWS is not the only cloud service to offer this. Every major player–Google Cloud or Microsoft Azure, for example–offers similar high-memory instances as Infrastructure-as-a-Service (IaaS) that can address the problem of insufficient memory while executing a data science task (on a local machine).

Google Colab

Google Colaboratory (or Colab, as it is known popularly; https://research.google.com/colaboratory/faq.html) is also a cloud service at its core. Basically, it runs a Jupyter notebook service that is hosted on Google cloud servers. You can use a CPU, GPU, or even a TPU (if you are lucky) for free just by having a Google account.

The greatest advantage of Colab, as compared to AWS or GCP, is its ease of use and low barrier of entry. If you have your data science Python code in a Jupyter notebook, Colab can help get you started on this cloud instance instantly (as soon as you upload your notebook to the instance). Unlike AWS or GCP barebone instances, there is no setup or installation needed. You can directly access Colab notebooks through your browser and start running your code in a matter of minutes.

What is a Tensor Processing Unit?

Tensor Processing Units (TPUs) are Google’s custom-developed application-specific integrated circuits (ASICs) used to accelerate machine learning workloads. These ASICs are designed from the ground up with the sole aim of optimizing the speed and power of computation tasks that appear in deep learning such as matrix multiplication and addition, special activations functions, other linear algebra routines like matrix inversion, and so on. Their internal architecture is quite different from traditional CPUs that are designed for general purpose computing tasks. Memory bandwidth memory transfer speed of a TPU is also enhanced as this factors critically in a deep learning training performance.

For some specific situations, this may indeed increase productivity and efficiency. For example, if the local laptop does not have a good enough CPU or a GPU card installed, or the RAM is under 8GB, then switching to Google Colab should enhance the productivity instantly.

The typical instance (free of cost) has ~12-13 GB of RAM and a CPU equivalent to an Intel Xeon processor. Getting a GPU instance is also quite easy, with the most common GPU being a Tesla K80 (compute 3.7, having 2496 CUDA cores and 12GB GDDR5 VRAM). While the CPU core count is nothing boast about, having a larger RAM and GPU memory may help data science exploration, especially if it involves GPU-intensive tasks like training a deep neural network or even vectorized computation involving NumPy arrays. If 12GB RAM seems too little, you can upgrade to Colab Pro (https://colab.research.google.com/signup), which offers double the RAM for only $10/month (a whole lot cheaper than paying for an equivalent EC2 instance with 24GB of RAM).

However, despite its attractive features, Colab does have some serious limitations for practicing data scientists who are trying to explore larger datasets and scale up their data science workflow. At the outset, it puts a time limit on the running time of the notebook, so if you leave it idle for a certain amount of time, the instance will die (along with any variables and internal states). Basically, you must plan your code execution carefully and be ever vigilant to take full advantage of Colab.

Also, file loading (whether uploading from local drive or reading from the Web) is painfully slow (most probably a deliberate choice to control the bandwidth usage over the Google Cloud infrastructure). Therefore, while you can do in-memory analytics and data transformations rather quickly, the initial loading can take an inordinate amount of time or may even crash your notebook. Upgrading to Colab Pro or Pro+ (from a completely free account) alleviates these issues to some extent but not fully.

pandas-Specific Tricks

Since I started the scalability discussion by pointing out the out-of-core issues in pandas, it makes sense to loop back to ground zero and examine what suggestions the pandas developers have to address this issue.

There is a dedicated resource page on the pandas documentation portal about this topic: “Scaling to large datasets” (https://pandas.pydata.org/pandas-docs/stable/user_guide/scale.html). It starts like this:

Pandas provides data structures for in-memory analytics, which makes using pandas to analyze datasets that are larger than memory datasets somewhat tricky. Even datasets that are a sizable fraction of memory become unwieldy, as some pandas operations need to make intermediate copies.

It goes on to point out some useful tricks and techniques for coping with memory issues. I discuss some of them below and add a few more.

Load Only the Columns You Need

Often, a particular data transformation task requires only a small fraction of the columns that the complete dataset features. If you have a dataset with 10 million rows and 100 columns, and you need only the first 5, it makes absolute sense to load only those 5 columns and not even look at the rest. You avoid loading a whopping 950 million pieces of data into memory. The essential trick here is to include the necessary argument in your data loading function.

Write
df = pd.read_csv("Large-file.csv",
                   names = ['Col-1','Col-2','Col-3'])
instead of
df = pd.read_csv("Large-file.csv")

This little change can indeed make or break your data transformation pipeline.

Column-Specific Functions (If Applicable)

Following the same idea as above, it is a good practice (wherever applicable) to write separate functions that deal with specific columns/features in the dataset as needed (Figure 9-4). For example, a dataset may have the following:
  • String data corresponding to name and address. This can be handled by a specific function.

  • Datetime data corresponding to some business transaction. This should be handled by another specific function that loads and process only these columns.

  • Pure numeric data, which can be handled many ways, even read as a pure NumPy array and utilizing vectorizing tricks (as discussed elsewhere in this book) to speed up the data transformation process.

Figure 9-4

Functions to deal with specific columns of a large on-disk file, never loading more than a small fraction into memory

Explicitly Specify/Convert Data Types

The default data types in pandas are not designed to be the most memory efficient. This is especially true for text/string data columns with relatively few unique values (alternatively known as low-cardinality data). By using more efficient and targeted data types, you can significantly reduce the memory usage and process larger datasets.

There is a dataset called loan_data.csv on file (supplied with the book). Let’s see how explicitly specifying the data type can reduce the memory usage while working with this dataset:
import pandas as pd
df = pd.read_csv("../loan_data.csv")
df.memory_usage(deep=True)
The function memory_usage() shows the true memory usage by the in-memory object. The output is shown in Figure 9-5. The output of df.info() is shown in the same figure, indicating that the default loading assigned the general-purpose object data type to that column while others were assigned data types like int64 or float.
Figure 9-5

Default data loading assigned a general-purpose data type to a text/string column, causing it to take up too much memory

You might also have noticed that the credit.policy is an unsigned integer taking on values 1 or 0. Why do you need a 64-bit integer data type to represent that? So, let’s also type convert that column:
df['credit.policy'].unique()
>> array([1, 0], dtype=int64)
Here is the code for doing the data type conversions (explicit specifications):
df2 = df.copy()
df2['purpose'] = df2['purpose'].astype('category')
df2['credit.policy'] = df2['credit.policy'].astype('uint8')
del(df)

Here, first you do a copy on the existing DataFrame. Then, you use the astype function to assign the category data type to the purpose column and unit8 (8-bit unsigned integer) to the credit.policy column. Lastly, as a good practice, you delete the old DataFrame object from the memory since you no longer need it for your data science pipeline.

You can see the stark difference in Figure 9-6.
Figure 9-6

Loan dataset memory usage after explicit data type specification/conversion

This memory saving may seem trivial for this example, but small savings like this add up quickly for a long and data-intensive pipeline and can reduce the total overhead significantly.

Libraries for Parallel Processing

Parallel computing is an extensive field of its own. It is not trivial to implement optimized code in Python that will execute parallel threads/processes flawlessly and with high performance. Fortunately, there are some fantastic Python frameworks for doing parallel processing with minimal learning curves.

I will discuss a couple of them, Dask and Ray, with hands-on examples in the next chapter, so I won’t get into those details here.

Libraries for Handling Out-of-Core Datasets

There are special libraries to handle out-of-core datasets. Vaex and Modin are two such frameworks. Let’s discuss them in more detail with hands-on examples next.

A Note About the Preferred OS

Although a many data scientists use Windows OS for their day-to-day tasks, it has been observed that (in general, and while doing the technical review of this book) advanced libraries like Vaex, Modin, Ray, and Dask may have trouble being set up or performing smoothly on Windows OS. Therefore, you are strongly encouraged to use a Linux-based OS for practicing with these libraries and running some of the Jupyter Notebooks that are provided. You can either
  • Use a Linux-based OS (e.g., Ubuntu, Fedora, or Red Hat) on your local machine natively

  • Run a virtual machine (VM) using tools like Oracle VirtualBox on your Windows-based machine, with a Linux-based OS on the VM

  • Use a cloud instance with a Linux-based OS (including the Amazon Linux flavor that comes with any EC2 instance)

Hands-On Example with Vaex

Vaex is a Python library designed for working with lazy out-of-core DataFrames. One of its central goals is to help visualize and explore big tabular datasets. Vaex is high-performant for large datasets. For example, it can help calculate statistics such as mean, sum, count, standard deviation, and more on an N-dimensional grid of up to a billion objects/rows per second.

In this section, you will see hands-on examples of such calculations and visualizations with the Vaex library.

Features at a Glance

Here is a quick summarization of the key features of Vaex:
  • Performance: It can work easily with huge tabular data. Its processing capability is in the order of billions rows/second.

  • Lazy/virtual columns: The computation is done on the fly, without wasting precious RAM/virtual memory.

  • Memory efficient: No memory copies when doing routine data slicing such as filtering/selections/subsets.

  • Visualization: Natively and directly supported. Lots of functions to realize routine visualization from huge tabular datasets.

  • User friendly API: The DataFrame object is the main API and it is all that a general user will ever need. The API feels very similar to pandas and therefore presents with minimal learning curve when replacing pandas code with Vaex for out-of-core data processing.

  • Lean and compartmentalized: Vaex is separated into multiple subpackages and you can install any combination of them as per your specific needs. For example, Vaex-astro supports astronomy related transformations and FITS file reading. Vaex-viz support all visualizations. But if all you want is to calculate statistics and not visualize the data, you don’t have to install it. For modern file types like Apache Arrow, it has a package named Vaex-arrow.

Basic Usage Example

Start by using an example dataset provided with Vaex:
import vaex
df = vaex.example()

When you run this code first time, it will download the dataset from the Web, so an Internet connection is required while running this code first time. It will store the dataset (a .hdf5 file) in a folder called data.

You can examine the information about the file:
df.info()
You will see something like Figure 9-7.
Figure 9-7

Vaex example dataset information

The slicing and indexing of the data are just like pandas. For example, say you want to see only the x, y, vx, and vy columns for rows 3 to 7:
df[['x','y','vx','vy']][3:8]
It will give you the expected output (Figure 9-8).
Figure 9-8

Vaex example of indexing the dataset

The calculation of statistics is fast. On my laptop, calculating the mean of 330,000 rows took under 20ms.
%%timeit
df.x.mean()
>> 19.6 ms ± 2.1 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

No Unnecessary Memory Copying

The best thing about Vaex is that it does not create unnecessary copies of DataFrame objects while doing simple filtering operations or intermediate calculations. Even the base DataFrame has minimal memory impact. The computations are done in a lazy, on-the-fly (when necessary) manner.

Check the memory footprint with this code:
import sys
# Vaex dataframe
print("Size of Vaex DF:", sys.getsizeof(df))
# Convert to Pandas dataframe
df_pandas = df.to_pandas_df()
print("Size of Pandas DF:", sys.getsizeof(df_pandas))
The output is astonishing:
Size of Vaex DF: 48
Size of Pandas DF: 13530144

You can run all the necessary calculations on the Vaex dataframe object with much less worry about memory errors cropping up.

For example, you can filter the dataframe for only those rows that have negative x values and positive z values:
df_filtered = df[df.x < 0 and df.z > 0]
sys.getsizeof(df_filtered)
>> 48
As expected, the new df_filtered dataframe still has a low memory footprint, but it only has 164464 rows compared to original 330,000 rows.
df_filtered.shape
>> (164464, 11)

Expressions and Virtual Columns

You can create custom expressions and assign them to virtual columns with no memory copying (again). Working with pure pandas code, every such operation runs a chance of creating memory overhead. Let’s say you want to calculate the root of the sum of the squares of two columns from the example dataset:
import numpy as np
sqroot_exp = np.sqrt(df['x']**2+df['y']**2)
Now, if you examine this sqroot_exp, you will see that it is a special expression (not evaluated yet). It has not created any memory overhead.
type(sqroot_exp)
>> vaex.expression.Expression
If you do this in pandas, it will create a pandas series object:
sqroot_pandas = np.sqrt(df_pandas['x']**2+df_pandas['y']**2)
type(sqroot_pandas)
>> pandas.core.series.Series
Now, such Vaex expressions can be added to a DataFrame, creating a virtual column. These virtual columns are similar to normal DataFrame columns, except they do not waste any memory.
# Assignment of expression to a virtual column
df['sqroot'] = sqroot_exp
# Evaluation only when needed
df['sqroot'].mean()
>> array(8.38820497)

Computation on a Multidimensional Grid

One of the most interesting features of Vaex is the ability to calculate statistics on user-selectable grids in a fast and efficient manner. This has many practical applications when you are interested in finding local minima or maxima or the distribution of numeric quantities over specific regions from a maze of numbers.
counts_x = df.count(binby=df.x, limits=[-5, 5], shape=32)
counts_x
>> array([4216, 4434, 4730, 4975, 5332, 5800, 6162, 6540, 6805, 7261, 7478,7642, 7839, 8336, 8736, 8279, 8269, 8824, 8217, 7978, 7541, 7383,7116, 6836, 6447, 6220, 5864, 5408, 4881, 4681, 4337, 4015], dtype=int64)

The result is nothing but a NumPy array with the number counts in 32 bins distributed between x = -5, and x = 5. The key thing to note here is the binby argument inside the function that works similar to GroupBy in SQL or even pandas. Here the data was grouped by the x column (binby=df.x).

So, with this single line of code, you
  • Filtered/restricted the data within -5 and 5

  • Counted the number of data points

  • Binned the counts in 32 bins

Figure 9-9 shows the visualization.
Figure 9-9

Counts of x column data for a specific range and bin count

Want a more powerful example? You can calculate the root of the sum of the squares of velocities vx, vy, and vz to get the resultant velocity. However, you may want to do it for a certain range of x and y data and bin the result for easy visualization.
# Just an expression
velo = np.sqrt(df.vx**2 + df.vy**2 + df.vz**2)
# Pass the expression to the function
# Binned by x and y, over limits of -10 to 10
xy_mean_v = df.mean(velo, binby=[df.x, df.y],
                    limits=[[-10, 10], [-10, 10]],
                    shape=(64, 64))
You can do a 2D plot of the resultant velocity over the same xy range:
plt.figure(dpi=120,figsize=(3,3))
plt.imshow(xy_mean_v.T,
           origin='lower',
           extent=[-10, 10, -10, 10])
plt.show()
Figure 9-10 shows the result.
Figure 9-10

Resultant velocity calculated and visualized over specific ranges of x and y data

Dynamic Visualizations Using Widgets and Other Plotting Libraries

The N-dimensional grid-based computation is designed to be fast with Vaex. This allows you to extend the visualization to be dynamic using widgets and third-party libraries like bqplot. Unfortunately, these dynamic visualizations are not possible to render in the pages of a book. However, some code and results are shown in the Jupyter notebook.

For example, this simple code creates a plot widget in the Jupyter notebook where you can pan and zoom around and choose a few data transformations from the drop-down menu. (Figure 9-11 shows a static snapshot of the widget.)
Figure 9-11

Snapshot of a dynamic visualization with a Vaex plot widget method

The usefulness of such utility methods cannot be overemphasized for large-scale data analysis. You can plot complex transformations on large, out-of-core datasets (say 10GB or 20GB in size) with only a few lines of code to visualize the hidden patterns. This increases the productivity and efficiency of such a data analysis pipeline far beyond what would have been possible with only pandas and Matplotlib code.

Vaex Preferred HDF5 Format

The magic in Vaex happens because of internal optimization and data representation. One of the design choices is to work with HDF5 file formats as much as possible. Therefore, the best way to work with Vaex is to load other types of data into this format before you start exploring. For convenience, Vaex provides many utility methods to convert other files or data structures to this format. You can convert from CSVs, Arrow tables, Python dictionaries, NumPy arrays, JSON, and more.

For example, this code converts a moderate-sized CSV file (close to a million rows and 15 columns) into a HDF5 file:
df2 = vaex.from_csv("Large-data.csv", convert=True)

When you run this code, another file named Large-data.csv.hdf5 gets created in the folder where the Large-data.csv file resides. You must not forget to set convert=True for this to happen.

After that, you can read/open this HDF5 much faster than what is possible with pandas CSV reading. Here is the complete code:
# Pandas reading CSV
t1 = time.time()
df2_pandas = pd.read_csv("Large-data.csv")
t2 = time.time()
print(f"Took {round(1000*(t2-t1),3)} milliseconds with Pandas")
# Vaex conversion from CSV
df2 = vaex.from_csv("Large-data.csv", convert=True)
# Vaex reading HDF5
t1 = time.time()
df2 = vaex.open("Large-data.csv.hdf5")
t2 = time.time()
print(f"Took {round(1000*(t2-t1),3)} milliseconds with Vaex")
The results speak for themselves:
Took 2354.523 milliseconds with Pandas
Took 14.057 milliseconds with Vaex
In today’s world of data science, a dataset with a million rows is not a particularly large one. Even this modest sized file caused pandas to take over 2 seconds to read from the disk. With Vaex, after conversion to HDF5, it becomes so much faster. Therefore, for a data processing pipeline utilizing the power of libraries like Vaex, it makes sense to convert (in a systematic manner) all the text-based data files (CSV or even JSON if that makes sense) to HDF5 and read them as a Vaex DataFrame as much as possible, as illustrated in Figure 9-12.
Figure 9-12

Converting to HDF5 and working with Vaex results in a faster and more productive data science pipeline, particularly for out-of-core datasets

Hands-On Examples with Modin

Modin is a library whose actual utility falls into the realm of parallel processing or multi-core processing. It uses a Ray or Dask back end to provide an effortless way to speed up pandas notebooks, scripts, and libraries. The main attractiveness of Modin is its tight integration and identical API to that of pandas.

You will see the use of a Dask DataFrame and Ray in the next chapter. However, unlike these distributed DataFrame libraries, Modin provides seamless integration and compatibility with existing pandas code including DataFrame construction. Basically, you just need to change a single line of code to get started.

Single CPU Core to Multi-Core

For most of the data science workload to use Modin, you just start like this:
import modin.pandas as pd
From a simple change in one line of code, the benefit that you get is enormous. This comes from the fact that despite all the great features and capabilities, the core implementation of pandas is inherently single-threaded. This means that only one of the multiple CPU cores can be utilized at any given time for executing normal pandas code. In a single CPU machine (e.g., a data scientist’s laptop), it would look something like Figure 9-13.
Figure 9-13

The pandas code utilizing only a single core of the system

However, just wrapping the pandas code with Modin (a single line of code change), you can utilize all the cores (by setting up a Dask or Ray backend cluster/worker system), as shown in Figure 9-14.
Figure 9-14

Modin code utilizing all the CPU cores

Out-of-Core Processing

Let’s now demonstrate the out-of-core processing capability of Modin. Here, the phrase “core” does not refer to the CPU core but really to the system memory or RAM.

The following code creates a DataFrame with ~1 million (220 to be precise) rows and 256 columns with random integers. Note the use of modin.pandas here.
import modin.pandas as pd
import numpy as np
raw_data = np.random.randint(0, 100, size=(2**20, 2**8))
df = pd.DataFrame(raw_data).add_prefix("col")
When you execute this code for the first time, you may see some user warnings and message (Figure 9-15) about the Dask cluster setup (assuming that you are using the Dask back end for the parallel processing/ clustering). In the next chapter, you will see how to start and monitor a Dask cluster. The good thing with Modin is that all of this gets taken care of under the hood, and the user doesn’t have to write the cluster setup code.
Figure 9-15

Warning message related to Dask cluster setup for Modin code execution (first time only)

You can check the information about the DataFrame:
df.info()
It will show something like this:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1048576 entries, 0 to 1048575
Columns: 256 entries, col0 to col255
dtypes: int32(256)
memory usage: 1.0 GB
So, under the hood, it uses the pandas.core.frame.DataFrame class but when you check the type of the DataFrame object, it is a Modin pandas object, not the regular pandas.
type(df)
>> modin.pandas.dataframe.DataFrame
Now you come to the key part of this demo. The following code concatenates 20 such 1GB DataFrames into a single large DataFrame. Check out the time it takes to do this and think what could have gone wrong if you tried this with normal pandas code (assuming your local machine has a 16GB RAM).
import time
t1 = time.time()
big_df = pd.concat([df for _ in range(20)])
t2 = time.time()
print(f"Took {round(1000*(t2-t1),3)} milliseconds")
It should be done under a second.
Took 236.584 milliseconds
If you want to see the shape of this large DataFrame big_df:
big_df.shape
>> (20971520, 256)

So, it does have over 20 million rows and 256 columns. This would be almost impossible to handle as a persistent in-memory object with pure pandas.

If you check the memory usage explicitly:
big_df.memory_usage(deep=True)
>>
Index     167772160
col0       83886080
col1       83886080
col2       83886080
col3       83886080
            ...
col251     83886080
col252     83886080
col253     83886080
col254     83886080
col255     83886080
Length: 257, dtype: int64

So, each column’s memory usage is over 80MB. In total, for 256 columns (and one index), this represents over 20GB of memory usage. My laptop has only 16GB of RAM and surely the Jupyter notebook, where this code is being run, did not take up all the memory. This is the direct demonstration of out-of-core computing with Modin.

You can treat this large DataFrame as a regular pandas DataFrame for all purposes from now on. For example, calculating the mean on col0 is done under 2 seconds.
t1 = time.time()
big_df['col0'].mean()
t2 = time.time()
print(f"Took {round(1000*(t2-t1),3)} milliseconds for calculating mean of col0")
>> Took 2677.044 milliseconds for calculating mean of col0
How about calculating the mean of the entire DataFrame? Instead of one column, now you are operating over all 256 columns of data.
t1 = time.time()
big_df.mean()
t2 = time.time()
print(f"Took {round(1000*(t2-t1),3)} milliseconds for calculating mean of the entire DataFrame")
>> Took 37654.585 milliseconds for calculating mean of the entire DataFrame

So, the time goes from 2.7 seconds to 37.7 seconds. Not a 256X increase in the computing time, but much less. This is the fruit of parallel processing and allocating data chunks optimally to each worker that the Dask cluster has set up in the background.

Other Features of Modin

Modin is a live, open-source project and new contributions get added all the time. It also has
  • Distributed XGBoost support for fast machine learning

  • Standard SQL connection support to execute SQL queries on databases

  • Gradually maturing support for various input data ingestion APIs (reading all kinds of files and data formats). In this matter, if something is tricky to support, it defaults to the pandas reading API automatically for ingestion, and then processes the object as a Modin DataFrame.

For more details and updates, interested readers should definitely check out the official documentation at https://modin.readthedocs.io/en/stable/index.html.

Summary

In this chapter, you started addressing the concept of scaling out a data science workload to multiple CPU cores and beyond the system memory. This is particularly important for dealing with increasingly larger datasets, going from the realm of megabytes to gigabytes to terabytes and more. The conventional Python data science ecosystem using NumPy, pandas, and Matplotlib is great at smaller datasets but starts becoming inefficient while dealing with large file sizes, particularly reading from the disk or performing aggregation and statistical computations. pandas may throw up memory errors for a lot of trivial situations involving multi-GB level datasets because it makes a lot of unnecessary memory copies while doing regular data wrangling.

You explored common tricks and techniques within pandas to address these issues such as selective data loading, explicit type setting, and more. Then, you followed hands-on examples of out-of-core computing and scalability with large file and dataset size with two powerful libraries, Vaex and Modin. Doing data transformation (or visualization) with such large datasets would have been slow and inefficient with pure pandas code.

Among these, Modin uses a Ray or Dask back end for distributing computing load to multiple CPU cores. In the next chapter, you will take this discussion of scalable data science further by exploring these parallel or distributed computing aspects.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.175.21