Chapter 9. Techniques for Predictive Analytics in Production

An overarching theme throughout this book has been the accessibility of machine learning. Many powerful, well-understood techniques have been around for decades. What has changed in the past few years are parallel advances in software and hardware that led to the rise of distributed data processing systems.

Real-Time Event Processing

The definition of real time, in terms of specifying a time window, varies dramatically by industry and application. However, a few design principles can improve predictive analytics performance in a wide variety of applications.

Designing a data processing system is a process of deciding when and where computation will happen. In general, all data requires some degree of processing before it can be analyzed. System architects must decide what processing happens at which stage of the data pipeline. At a high level, it is a decision between preprocessing data, which requires more time at the outset but makes data easier to query, versus simply capturing and storing data in its arrival format and doing additional processing at query time.

Structuring Semi-Structured Data

For example, suppose that you are tracking user behavior on an ecommerce website. Most of the information, such as event data, will arrive in a semi-structured format with information like user ID, page ID, and timestamp. In fact, there are probably several different types of events: product page view, product search, customer account login, product purchase, view related product, and so on. A single user session likely consists of tens or hundreds of events. A high-traffic ecommerce website might produce thousands of events per second.

One option is to store each event record in its arrival format. This solution requires essentially no preprocessing and records can go straight to a database (assuming the database supports JSON or another semi-structured data format). Querying the data will likely require performing some aggregation on the fly because every event is its own record.

You could use a key-value or document store, but then you would sacrifice query capabilities like JOINs. Moreover, all modern relational databases offer some mechanism for storing and querying semi-structured data. MemSQL, for example, offers a JSON column type. With a modern database that processes, stores, and queries relational data together with semi-structured data, even in the same query, there are few practical business reasons to use a datastore without familiar relational capabilities.

Suppose that you store each event as its own record in a single column table, where the column is type JSON.

CREATE TABLE events ( event JSON NOT NULL );

EXPLAIN events;
+-------+------+------+------+---------+-------+
| Field | Type | Null | Key  | Default | Extra |
+-------+------+------+------+---------+-------+
| event | JSON | NO   |      | NULL    |       |
+-------+------+------+------+---------+-------+
1 row in set (0.00 sec)

This approach requires little preprocessing before inserting a record.

INSERT INTO
    events ( event )
VALUES
    ( '{ "user_id": 1234, "purchase_price": 12.34 }' );

The query to find the sum total of all of one customer’s purchases might look like the following:

SELECT
    event::user_id user_id,
SUM ( event::$purchase_price )total_spent
FROM
    events
WHERE
    event::$user_id = 1234

This approach will work for small datasets, up to tens or hundreds of thousands of records, but even then will add some query latency because you a querying schemaless objects. The database must check every JSON object to make sure it has the proper attributes (purchase_price for example) and also compute an aggregate. As you add more event types with different sets of attributes and also data volumes grow, this type of query becomes expensive.

A possible first step is to create computed columns that extract values of JSON attributes. You can specify these computed columns when creating the table, or after creating the table use ALTER TABLE.

CREATE TABLE
    events (
          event JSON NOT NULL,
          user_id AS event::$user_id PERSISTED INT,
          price AS event::$purchase_price PERSISTED FLOAT
    );

This will extract the values for user_id and purchase_price when the record is inserted, which eliminates computation at query execution time. You can also add indexes to the computed columns to improve scan performance if desired. Records without user_id or purchase_price attributes will have NULL values in those computed columns. Depending on the number of event types and their overlap in attributes, it might make sense to normalize the data and divide it into multiple tables. Normalization and denormalization is usually not a strict binary—you need to find a balance between insert latency and query latency. Modern relational databases like MemSQL enable a greater degree of normalization than traditional distributed datastores because of the distributed JOIN execution. Even though concepts like business intelligence and star schemas are commonly associated with offline data warehousing, in some cases it is possible to report on real-time data using these techniques.

Suppose that you have two types events: purchases and product page views. The tables might look like this:

CREATE TABLE purchases (
    event JSON NOT NULL,
    user AS event::$user_id PERSISTED INT,
    session AS event::$session_id PERSISTED INT,
    product AS event::$product_id PERSISTED TEXT,
    price AS event::$purchase_price PERSISTED FLOAT
);

CREATE TABLE views (
    event JSON NOT NULL,
    user AS event::$user_id PERSISTED INT,
    session AS event::$session_id PERSISTED INT,
    product AS event::$product_id PERSISTED TEXT,
    time_on_page AS event::$time_on_page PERSISTED INT
);

We assume that views will contain many more records than purchases, given that people don’t buy every product they view. This motivates the separation of purchase events from view events because it saves storage space and makes it much easier to scan purchase data.

Now, suppose that you want a consolidated look into both views and purchases for the purpose of training a model to predict the likelihood of purchase. One way to do this is by using a VIEW that joins purchases with views.

CREATE VIEW v AS
SELECT
    p.user user,
    p.product product,
    p.price price,
    COUNT(v.user) num_visits,
    SUM(v.time_on_page) total_time
FROM
    purchases p INNER JOIN views v;

Now, as new page view and purchase data comes in, that information will immediately be reflected in queries on the view. Although normalization and VIEWs are familiar concepts from data warehousing, it is only recently that they could be applied to real-time problems. With a real-time relational database like MemSQL, you can perform business intelligence-style analytics on changing data.

These capabilities become even more powerful when combined with transactional features UPDATEs and “upserts” or INSERT ... ON DUPLICATE KEY UPDATE ... commands. This allows you to store real-time statistics, like counts and averages, even for very-high-velocity data.

Real-Time Data Transformations

In addition to structuring data on the fly, there are many tasks traditionally thought of as offline operations that can be incorporated into real-time pipelines. In many cases, performing some transformation on data before applying a machine learning algorithm can make the algorithm run faster, give more accurate results, or both.

Feature Scaling

Many machine learning algorithms assume that the data has been standardized in some way, which generally involves scaling relative to the feature-wise mean and variance. A common and simple approach is to subtract the feature-wise mean from each sample feature, then divide by the feature-wise standard deviation. This kind of scaling helps when one or a few features affect variance significantly more than others and can have too much influence during training. Variance scaling, for example, can dramatically speed up training time for a Stochastic Gradient Descent regression model.

The following shows a variance scaling transformation using a scaling function from the scikit-learn data preprocessing library:

>>> from memsql.common import database
>>> from sklearn import preprocessing
>>> import numpy as np
>>> with database.connect(host="127.0.0.1", port=3306, 
... user = "root", database = "sample") as conn:
...     a = conn.query("select * from t")
>>> print a
[Row({'a': 0.0, 'c': -1.0, 'b': 1.0}), 
Row({'a': 2.0, 'c': 0.0, 'b': 0.0}), 
Row({'a': 1.0, 'c': 2.0, 'b': -1.0})]
>>> n = np.asarray(a.rows)
>>> print n
[[ 0.  1. -1.]
 [ 2.  0.  0.]
 [ 1. -1.  2.]]
>>> n_scaled = preprocessing.scale(n)
>>> print n_scaled
[[-1.22474487  1.22474487 -1.06904497]
 [ 1.22474487  0.         -0.26726124]
 [ 0.         -1.22474487  1.33630621]]

This approach finds a scaled representation for a particular set of feature vectors. It also uses the feature-wise means and standard deviations to create a generalized transformation into the variance-standardized space.

>>> n_scaler = preprocessing.StandardScaler().fit(n)
>>> print n_scaler.mean_
[ 1.          0.          0.33333333]
>>> print n_scaler.scale_
[ 0.81649658  0.81649658  1.24721913]

With this information, you can express the generalized transformation as a view in the database.

CREATE VIEW scaled AS
SELECT
    (t.a - 1.0) / .8164 scaled_a,
    (t.b - 0.0) / .8164 scaled_b,
    (t.c - 0.33) / 1.247 scaled_c
FROM
    my_table t

Now, you interactively query or train a model using the scaled view. Any new records inserted into my_table will immediately show up in their scaled form in the scaled view.

Real-Time Decision Making

When you optimize real-time data pipelines for fast training, you open new opportunities to apply predictive analytics to business problems. Modern data processing techniques confound the terminology we traditionally use to talk about analytics. The “online” in Online Analytical Processing (OLAP) refers to the experience of an analyst or data scientist using software interactively. “Online” in machine learning refers to a class of algorithms for which the model can be updated iteratively, as new records become available, without complete retraining that needs to process the full dataset again.

With an optimized data pipeline, there is another category of application that uses models that are “offline” in the machine learning sense but also don’t fit into the traditional interaction-oriented definition of OLAP. These applications fully retrain a model using the most up to date data but do so in a narrow time window. When data is changing, a predictive model trained in the past might not reflect current trends. The frequency of retraining depends on how long a newly trained model remains accurate. This interval will vary dramatically across applications.

Suppose that you want to predict recent trends in financial market data, and you want to build an application that alerts you when a security is dramatically increasing or decreasing in value. You might even want to build an application that automatically executes trades based on trend information.

We’ll use the following example schema:

CREATE TABLE `ask_quotes` (
    `ticker` char(4) NOT NULL,
    `ts` BIGINT UNSIGNED NOT NULL,
    `ask_price` MEDIUMINT(8) UNSIGNED NOT NULL,
    `ask_size` SMALLINT(5) UNSIGNED NOT NULL,
    `exchange` ENUM('NYS','LON','NASDAQ','TYO','FRA') NOT NULL,
    KEY `ticker` (`ticker`,`ts`),
);

In a real market scenario, new offers to sell securities (“asks”) stream in constantly. With a real-time database, you are able to not only record and process asks, but serve data for analysis simultaneously. The following is a very simple Python program that detects trends in market data. It continuously polls the database, selecting all recent ask offers within one standard deviation of the mean price for that interval. With recent sample data, it trains a linear regression model, which returns a slope (the “trend” you are looking for) and some additional information about variance and how much confidence you should have in your model.

#!/usr/bin/env python

from scipy import stats
from memsql.common import connection_pool

pool = connection_pool.ConnectionPool()
db_args = [<HOST>, <PORT>, <USER>, <PASSWORD>, <DB_NAME>]

# ticker for security whose price you are modeling
TICKER = <ticker>  

while True:
    with pool.connect(*db_args) as c:
        a = c.query('''
            SELECT ask_price, ts
            FROM (
                SELECT *
                FROM ask_quotes
                ORDER BY ts DESC LIMIT 10000) window
            JOIN (
                SELECT AVG(ask_price) avg_ask
                FROM ask_quotes WHERE ticker = "{0}") avg
            JOIN (
                    SELECT STD(ask_price) std_ask
                    FROM ask_quotes
                    WHERE ticker = "{0}") std
            WHERE ticker="{0}"
                AND abs(ask_price-avg.avg_ask) < (std.std_ask);
        '''.format(TICKER))
        x = [a[i]['ts'] for i in range(len(a) - 1)]
        y = [a[i]['ask_price'] for i in range(len(a) - 1)]

        slope, int, r_val, p_val, err = stats.linregress(x, y)

With the information from the linear regression, you can build a wide array of applications. For instance, you could make the program send a notification when the slope of the regression line crosses certain positive or negative thresholds. A more sophisticated application might autonomously execute trades using market trend information. In the latter case, you almost certainly need to use a more complex prediction technique than linear regression. Selecting the proper technique requires balancing the need for low training latency versus the difficulty of the prediction problem and the complexity of the solution.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.158.137