Chapter 5. Design Patterns

Now that you have a good grounding in the habits that are characteristic of successful large-scale production deployments and an understanding of some of the capabilities to look for in data platform and containerization technologies with which you build your systems, it’s useful to see how that comes to life in real-world situations. In this chapter, we present a collection of design patterns you can use across a wide range of use cases. These patterns are not industry specific and may be combined in a variety of ways to address particular business goals. Most important, these design patterns are not theoretical. We base them on what we see customers doing in successful large-scale production systems.

Your challenge is to figure out how you can put these powerful underlying patterns to use in your own situation. As you read this chapter, you may find it useful to skim the first paragraph of each design pattern to find those that relate to your needs.

Internet of Things Data Web

More and more businesses are building elements of intelligence, control, and reporting into physical products. These products can be cars, medical devices, shipping containers, or even kitchen appliances. As they build these features into their products, these businesses all have a common problem of moving data from these products to a central facility and pushing software updates back to the devices. Doing this requires secure transport of data between the things in the field back and an analytics system. One commonly requested feature of such a system is to build what is known as a digital shadow of each thing for informational purposes or diagnostics. Other goals include computing aggregate statistics of product feature usage and distribution of product updates.

The most important constraints in this design pattern are data security, scale and reliability in a difficult and nearly uncontrolled working environment.

Figure 5-1 shows a basic design that is common for this pattern. The process starts at the device, of which there might be hundreds—or hundreds of millions. Each device records status and sensor data for central analysis but also needs to get software and configuration updates. These updates may be targeted to one particular device or class of devices. It is almost universal that the data produced by the device must be kept private and that device data and software updates must be unforgeable.

In the Internet of Things Data Web design pattern, data flows to and from devices across a link (1) secured by standard public-key cryptographic protocols. Both device and gateway identity are secured by certificates. Updates (2) to configuration and firmware can be sent to devices via this link. Data from devices is forwarded (3) to a comprehensive log for all devices so that it can be rendered into an overall dashboard. This same data is sent to a stream (4) where the topic is the device ID. This allows per device histories known as device shadows to be maintained by direct access to the by_device stream and without a database.
Figure 5-1. In the Internet of Things Data Web design pattern, data flows to and from devices across a link (1) secured by standard public-key cryptographic protocols. Both device and gateway identity are secured by certificates. Updates (2) to configuration and firmware can be sent to devices via this link. Data from devices is forwarded (3) to a comprehensive log for all devices so that it can be rendered into an overall dashboard. This same data is sent to a stream (4) where the topic is the device ID. This allows per device histories known as device shadows to be maintained by direct access to the by_device stream and without a database.

Dashboards For All or For Each

At the point labeled 3 in Figure 5-1, device messages can be aggregated for dashboards. Such dashboards are very handy for verifying overall system function, especially if device anomaly detection is used to highlight unusual behavior in the population of devices.

If you have a streaming system that can handle a large number of topics, it is a nice step to copy the events to a stream where the topic is equal to the device ID. This corresponds to item 4 in Figure 5-1 and allows you to have a real-time look at any single device very easily with no database overheads. If you are using a system that doesn’t like high topic diversity, such as Apache Kafka, you will likely need to use a database for this, but this can be problematic if you have high update rates. Even if each device sends a message only rarely, the total rate can be pretty high. For instance, 100 million devices sending one message every 100 seconds results in a million messages per second. That is pretty easily handled by a well-partitioned streaming system, but it can be a bit more strenuous to handle that kind of update rate with a database. It isn’t impossible, but you need to budget for it. Of course, if you have 1,000 devices reporting once per hour, the update rate isn’t a big deal, and any technology will work.

The key steps in getting an IoT system of this kind into production is building out the gateway and first streaming layer. Getting security built in to the system correctly from the beginning is critical because almost everything else can be layered in after you are up and running.

Data Warehouse Optimization

One of the most straightforward ways to start with a big data system is to use it to optimize your use of a costly data warehouse. The goal of data warehouse optimization is to make the best use of your data warehouse (or relational database) resources in order to lower costs and keep your data warehouse working efficiently as your data scales up. One way to do this that offers a big payback is to move early Extract, Transform, and Load (ETL) processing and staging tables off of the data warehouse onto a cluster running Apache Spark or Apache Drill for processing. This approach is advantageous because these ETL steps often consume the majority of the processing power of the data warehouse, but they constitute only a much smaller fraction of the total lines of code. Moreover, the staging tables that are inputs to these ETL steps are typically much larger than subsequent tables, so moving these tables to an external cluster can result in substantial space savings. You gain an advantage by relieving strain on the data warehouse at your current data volumes, plus you’ll have set up a highly scalable system that will continue to work in a cost-effective way even as data volumes grow enormously. It makes sense to move each part of the process to the platform on which it works most efficiently. Often, initial data ingestion and ETL makes sense on Spark or Drill, whereas it may make sense to keep critical-path traditional processes on the data warehouse as before.

Figure 5-2 shows the evolution of a data warehouse system as data warehouse optimization proceeds. Initially, in the top panel, we see the traditional view of the process. Data is ingested by copying it into a networked storage system. This data is then imported into the staging tables on the actual data warehouse, and important data is extracted and transformed before loading (ETL) and final processing. This use of staging tables is broadly like what we have seen in actual customer installations. Significantly, the majority of computational resources are typically consumed in the ETL processing, but only a small minority of the code complexity is in this phase.

Data warehouse optimization works by moving staging tables and ETL processing to an external cluster, as shown in panels B and C. This change eliminates the need for a storage system to facilitate the transfer and removes considerable processing and storage load from the data warehouse.

You can change this process by using a data platform to optimize the system, as shown in the bottom two panels of Figure 5-2. In the middle panel of the figure, we see how this optimization works using a MapR cluster. Here, data is copied from the original source to a network-mounted file system exactly as before, but now the storage system has been replaced by a MapR cluster that holds the staging tables. All or some of the ETL process is run on the MapR cluster instead of the data warehouse, and then the work product of the ETL process is bulk loaded into the data warehouse using standard bulk import tools via another networked mount of the MapR cluster.

The evolution of a data warehouse system.
Figure 5-2. The evolution of a data warehouse system.

The lower panel of Figure 5-2 shows an alternative design for a non-MapR platforms. The goal is the same, but there are some variations in how the data is ingested for ETL and how the refined data is exported to the data warehouse. The biggest difference is the use of specialized connectors to work around the lack of high-performance NFS access to the Hadoop Distributed Files System (HDFS) cluster.

Exactly how much of the ETL process is moved to the cluster depends on the exact trade-off of code size, performance, and natural modularity in the code on the data warehouse. Typically, true extract and transform code runs much more efficiently on a cluster than a data warehouse, while advanced reporting code may well run faster on the data warehouse. These speed trade-offs must be measured empirically by converting sample queries, and the benefits of conversion then need to be balanced against fixed conversion costs and the variable savings of running the process on the external cluster. The final reporting code on the data warehouse is often large enough, complex enough, and difficult enough to test that the trade-off is clearly on the side of leaving it in place, at least initially.

When data warehouse optimization is done with some kind of Hadoop cluster, special-purpose connectors are required, as shown in the bottom panel (c) of Figure 5-2. This increases the complexity of the overall solution and thus increases the management burden and decreases reliability. With the MapR data platform (b), the need for connectors is avoided by using standard bulk export and bulk import utilities on the data source and data warehouse systems respectively together with direct access to the MapR cluster using standard file API’s.

The savings in using an external cluster for data warehouse optimization come from the displacement of the external storage and the substantial decrease in table space and ETL processing required on the data warehouse. This is offset slightly by the cost of the external cluster, but the net result is usually a substantial savings. In some cases, these savings are realized by the need for a smaller data warehouse, in others by a delay in having to upgrade or expand an existing data warehouse. In addition to a cost advantage, this style of cluster-based data warehouse optimization keeps all parts of the process running efficiently as your system grows. The move to an external cluster therefore future-proofs your architecture.

Extending to a Data Hub

A significant fraction of MapR customers name the centralization of data—sometimes called a data hub or data lake—as one of the most important early use cases. More and more, however, they are looking beyond the data hub to building a data fabric. The term “data hub” is very loosely defined, but the centralization concept is fairly simple and very powerful: by bringing together data from a variety of sources and data types (structured, unstructured, or semi-structured, including nested data) into a centralized storage accessible by many different groups for various types of analysis or export to other systems, you widen the possibilities for what insights you can harvest.

The concept of an enterprise data hub was one of the most commonly cited use cases for Hadoop clusters a few years ago. This represented the fact that Hadoop clusters were becoming less specialized and more of a company-wide resource. There were commonly difficulties, however, in allowing multitenancy because different applications had a tendency to interfere with one another. It was intended that the centralization of data would help break down unwanted data silos. Some forms of analysis, including some valuable approaches to machine learning, are greatly improved by being able to combine insights from more than one data source.

The data hub is a natural evolution from the data warehouse optimization use case, as well. Because the early stages of ETL bring in and save raw data, that same data can be accessed for other purposes, which can lead organically to the construction of a data hub. The relatively low cost of large-scale storage on big data systems relative to dedicated storage devices makes this particularly attractive. In Chapter 6, “Tip #2: Shift Your Thinking” refers to the benefit of delaying some decisions about how you want to process and use data. The data hub fits that idea by building a central source in which data can be used in a variety of ways for many different internal customers, some currently of interest, others to be discovered in the future, as depicted in Figure 5-3.

A data hub centralizes data from many sources and provides access to many users such as different groups of developers, data scientists, and business analysts. Here the reference database would be NoSQL HBase or MapR-DB. Having easy access to widely varied data makes new ideas and applications inevitable.
Figure 5-3. A data hub centralizes data from many sources and provides access to many users such as different groups of developers, data scientists, and business analysts. Here the reference database would be NoSQL HBase or MapR-DB. Having easy access to widely varied data makes new ideas and applications inevitable.

A data hub can also support development of a Customer 360 database, as described in the next section, along with ETL for data warehouse optimization, analysis of log data, processing of streaming data to be visualized on a dashboard, complex anomaly detection, other machine learning projects, and more. The common theme is that these clusters have a lot going on on them in all kinds of ways.

At this point, however, a data hub per se is often difficult to bring into production unless you have very specific use cases to deliver. In addition, the early view that Hadoop was the best way to build a data hub has changed somewhat with the emergence of tools like Apache Spark and the increasing need to be able to have a big data system that can integrate direct support for business action (operational data) and the ability to analyze that action (analytics). To do that, you need to have persistent data structures that can support low-latency operation. Streams and tables are often the preferred choice for this.

Stream-Based Global Log Processing

Stream-based global log processing is probably the simplest example of a full-fledged data fabric. Large computing systems are composed of processes that transform data (as in ETL processes) or respond to queries (as with databases or web servers), but all of these programs typically record the actions they take or the anomalous conditions they encounter as so-called log events, which are stored in log files. There’s a wealth of insights to be drawn from log file data, but up until recently, much of it has been overlooked and discarded. You can use logs to trigger alerts, monitor the current state of your machines, or to diagnose a problem shortly after it happens, but traditionally, the data has not been saved for more than a short period of time. Common uses of log data include security log analytics for analyzing network and computer intrusions, audience metrics and prediction, and the development of fraud prevention measures.

These log events often record a huge range of observations, such as records of performance or breakage. These records can capture the footprints of intruders or provide a detailed view of a customer’s online behavior. Yet when system diagrams are drawn, these logs are rarely shown. In fact, some people refer to log files as “data exhaust” as though they are just expendable and unwanted pollution. Traditionally, logs were deleted shortly after being recorded, but even if retained, they were difficult to process due to their size. In addition, logs are produced on or near the machines that are doing the processing, making it hard to find all the log files that might need processing.

All that is changing. Modern data systems make it possible to store and process log data because it allows cheap and scalable data storage and the ability to process large amounts of data and message streams make it easy to bring back to a central point for analysis.

Traditionally, log processing has been done by arranging an intricate dance between the producers of these logs and the programs that analyze the logs. On the producer side, the tradition has been to “roll” the logs by closing one log file when it reaches an age or size constraint and then start writing to another. As log files are closed, they become available for transport to the analysis program. On the receiving side, this dance was mirrored by methods for signaling exactly which files were to be processed and when programs were to run. Moreover, the output of one process typically was input to another, so this signaling dance cascaded.

This sort of processing can work well enough when all is well, but havoc reigns when something breaks or even when something is substantially delayed. Questions about whether old results had to be recomputed and which programs needed to be run late or run again were very difficult to answer.

More recently, a new pattern has emerged in which log processing is unified around a message-streaming system. The use of a message-streaming style dramatically simplifies the issues of what to do when and how to redo work that is affected by late-arriving data or system failures. This new pattern of processing has proven dramatically better than the old file-shipping style of log analysis.

Figure 5-4 shows the first step in stream-based global log processing. Here, we show web servers as the software generating logs, but it could as well be any kind of data that produces log files. As log messages are appended to any log file on any machine in the data center, that message is written to a message stream that is on a small cluster. The log messages could be written directly to the log stream instead of to files, but writing the log messages to a local file first minimizes the chance of the write to the stream hanging up the processing.

Multiple processes in the same data center can send logged events to a log stream on a consolidator machine. These processes are shown as web servers, but they could be any kind of machine or process that produces measurements or event logs. Similarly, we show these processes and the consolidator as if they are in an on premises data center, but they could as easily be in a single group of machines in a cloud such as Amazon Web Services (AWS) or Google Cloud Platform.
Figure 5-4. Multiple processes in the same data center can send logged events to a log stream on a consolidator machine. These processes are shown as web servers, but they could be any kind of machine or process that produces measurements or event logs. Similarly, we show these processes and the consolidator as if they are in an on premises data center, but they could as easily be in a single group of machines in a cloud such as Amazon Web Services (AWS) or Google Cloud Platform.

The topics used to write these messages to the log stream are commonly composed using a combination of the data center name, the name of the node running the application generating the log message, and possibly a message type. Message types are commonly used to distinguish different kinds of log lines or to differentiate between different kinds of measurements that might be contained in the log lines.

So far, there is little advantage in writing log messages to a stream. There are clear differences, however, because message streams can be easily replicated to a central data center with minimal configuration or setup, as shown in Figure 5-5.

You can replicate the log event streams from Figure 5-4 back to a central stream for analysis. With some data platforms, such replication is built in. For other streaming technologies, such as Apache Kafka, the replication has to be implemented. In any case, having non-overlapping topics at the source allows all of the data to funnel into a single stream.
Figure 5-5. You can replicate the log event streams from Figure 5-4 back to a central stream for analysis. With some data platforms, such replication is built in. For other streaming technologies, such as Apache Kafka, the replication has to be implemented. In any case, having non-overlapping topics at the source allows all of the data to funnel into a single stream.

Exactly how the replication of the stream to the central cluster is done varies depending on what kind of software you use to implement the stream. If you use Apache Kafka, you can configure an application called Mirror Maker to copy messages from one broker cluster to another. If you use MapR as a data platform for the streams, you can instead use the native stream replication that functions at the data-platform level itself.

In any case, the central data center (labeled GHQ in Figure 5-5) will have a stream whose contents are the union of all of the messages from any of many processes running in any of many data centers. You can process these messages to aggregate or elaborate them as shown in Figure 5-5. You also can monitor them to detect anomalies or patterns that might indicate some sort of failure.

The important point in this pattern is not just that log messages from all of the processes in all of the data centers can be processed centrally. Instead, it is that you can accomplish the transport of messages to the central point with a very small amount of application code because it is done at using core capabilities of the data platform itself. The result is substantially simpler and more reliable than using systems like Flume because message streams as a data construct provides a more opaque data abstraction.

Edge Computing

Gathering data from multiple locations and then processing it centrally, as with the previous design, Stream-Based Log Processing, is well and good, but there are times when you actually need to do something out at the edge with the data you are collecting. That locality of computation might be preferable for a number of reasons, possibly because you need very low latency, because your system has to keep working even if the network fails, or just because the total amount of raw information is larger than you really want to transmit back to headquarters. The type of computation can vary in complexity from simple processing or aggregation to running full-scale machine learning models.

This sort of system has proven useful in connecting drilling equipment, large medical instruments, and telecommunications equipment. In each of the cases we have seen, machine learning models, control software, or diagnostics software runs at each of many field locations on very small clusters of one to five small computers. Data moves back to one or more central systems using file mirroring or stream replication where the data is analyzed to produce operational reports as well as new models for the edge clusters. These new models are then transported back down to the edge clusters so that these new and improved models can be used in the field.

Even without machine learning, you can use edge computing to decrease the amount of data retained. This is commonly done by using a relatively simple anomaly detector on the data so that data that is predictable and, well, boring can be discarded while data that is unpredictable or unexpected is retained and transmitted to the central systems for analysis. This is useful in telecommunications systems for which the total amount of diagnostic data acquired is simply too large to return. It is also commonly done as a form of data compression for process control systems, as well.

As an example, in autonomous car development efforts, the raw data stream can be 2 GB/s or more. The advanced machine learning models that are used to understand the environment and control the car can be used to select data segments that are interesting or novel from the standpoint of the control system thus decreasing the total amount of data by roughly a factor of 1,000 without impairing the quality of the model training process.

Figure 5-6 shows an outline of the common structure of all of these systems.

Examples of Edge Computing. In data center 1, models are used to control data sources d1, d2, and d3. The data produced by these data sources is processed locally by control models but is also send to a galactic headquarters (GHQ) cluster to be used to train new models. In data center 2, on the other hand, data from sources d4, d5, and d6 are processed by an anomaly detector and only an interesting subset of the raw data is transported back to the GHQ.
Figure 5-6. Examples of edge computing. In data center 1, models are used to control data sources d1, d2, and d3. The data produced by these data sources is processed locally by control models but is also send to a galactic headquarters (GHQ) cluster to be used to train new models. In data center 2, on the other hand, data from sources d4, d5, and d6 are processed by an anomaly detector and only an interesting subset of the raw data is transported back to the GHQ.

Here, both strategies of edge computing, model execution, and anomaly detection are shown in a single figure. Most systems would not use a mixture of strategies; we show both here simply for illustration purposes.

Customer 360

The goal of a Customer 360 pattern is to establish a high-performance, consolidated store of complete histories for every customer. When this is done and the entire history for a single customer is viewed as a single consistent list of events, many kinds of processing become enormously simpler. The basic idea is that the nonrelational, highly flexible nature of state-of-the-art big data allows dramatically simpler interpretation of the data without having to join hundreds of tables from incompatible snowflake schemas together.

Having a coherent Customer 360 database makes many analytical tasks much easier. For example, extracting predictive features for event prediction is relatively simple to frame as a computation on the complete history of a single customer. You just need to iterate through a customer’s history twice: once to find the events that you want to predict, and once to find out what transactions preceded the event and thus might be predictive. You can also do a variety of ad hoc behavioral queries very easily if you can access all of a customer’s history at once. For instance, a mobile operator might know that several cell towers were misbehaving during a certain period of time. With a complete history of each handset, it is possible to find all customers who had dropped calls near the problematic towers at the right time. Reaching out to these customers, possibly with some sort of compensation, could help with potential attrition by showing that you care about the service that they are getting. Both of these examples would likely be too difficult to do relative to the expected value if you have to access a number of databases.

The idealized view of one data store to rule them all often gives way a bit to a structure more like the one shown in Figure 5-7. Here, as in the idealized view, many data sources are concentrated into a central store. These streams are accumulated in a reference database that is keyed by a common customer identifier. The records in these streams are nearly or fully denormalized so that they can cross from one machine to another, maintaining their internal consistency and integrity.

In a Customer 360 system, all kinds of information for a single customer are collected into a reference database and kept in a way so that customer histories can be accessed very quickly and with comprehensive retrieval of all desired data. In practice, internal customers of the data have specialized enough needs that it pays to extract views of the reference database into smaller, special-purpose subdatabases.
Figure 5-7. In a Customer 360 system, all kinds of information for a single customer are collected into a reference database and kept in a way so that customer histories can be accessed very quickly and with comprehensive retrieval of all desired data. In practice, internal customers of the data have specialized enough needs that it pays to extract views of the reference database into smaller, special-purpose subdatabases.

This reference database is stored in a NoSQL database such as HBase or MapR-DB. The key advantage that these databases offer for an application like this is that good key design will allow all of the records for any single customer to be stored nearly contiguously on disk. This means that a single customer’s data can be read very quickly—so fast, indeed, that the inherent expansion in the data caused by denormalization can often be more than compensated by the speed advantage of contiguous reads. In addition, a modern document database can store data whose schema is not known ahead of time. When you are merging data from a number of sources, it helps if you can take data verbatim rather than changing the schema of your reference database every time one of your upstream sources changes.

When building a Customer 360 database like this, it is likely that you will quickly find that your internal customers of this data will need specialized access to the data. For instance, one common requirement is to be able to search for patterns in the customer histories using a search engine. Search engines like ElasticSearch fill the requirement for search, but they are not generally suitable for use as a primary data store. The easy middle ground is to replicate a filtered extract of the updates to the main database to the search engine in near real time. You can easily implement this near real-time replication using a Change Data Capture (CDC) pattern, as described later in this chapter.

Another important consumer of Customer 360 data might be a team of machine learning experts building a predictive model. These teams typically prefer no database at all; instead, they prefer to get data in flat files. A common way to deal with this requirement is to run periodic extracts from the main database to get the record set that the team needs into a flat file and then, at least on a MapR system, to use filesystem mirroring to deploy the file or files to the cluster that the machine learning team is using. This method isolates the unpredictable machine load of the machine learning software from the production environment for the reference database. Alternately, programs like rsync can incrementally copy data from the master machine to the machine learning environment thus moving much less data than a full copy.

Transactionally correct mirroring is not available on HDFS, however, so a workaround is required on Hadoop systems to allow this type of data delivery. The typical approach used on non-MapR systems is to invoke a MapReduce program called distcp to copy the files to the development cluster. Careful management is required to avoid changing the files and directories being copied during the copy, but this alternative approach can make the Customer 360 use case work well on Hadoop systems.

Another common reason for custom extracts is to comply with security standards. The reference database typically contains sensitive information, possibly in encrypted or masked form. Permission schemes on columns in the reference database are used to enforce role-based limitations on who can access data in the database. Different versions of sensitive information are likely stored in different columns to give flexibility in terms of what data people can see. To secure the sensitive information in the reference database even more stringently, it is common to produce special versions of the reference database with all sensitive data masked or even omitted. Such an extract can be manipulated much more freely than the original and can be hosted on machines with lower security profiles, making management and access easier. Security-cleared extracts like this may be more useful even than the original data for many applications.

Recommendation Engine

The motivation for building a recommendation engine generally is to improve customer experience by better understanding what will appeal to particular customers. This is done by an analysis of the customers’ preferences communicated through their actions. The improved experience can result in increased sales, longer retention for services, stickier websites, or higher efficiency for marketing spend. In short, happier customers generally result in improved business and customers who find things they like tend to be happier.

Big data systems provide an excellent platform for building and deploying a recommendation system, particularly because good recommendation requires very large datasets to train a model. You can build and deploy a simple but very powerful recommender easily by exploiting search technology running on a data platform. In fact, such a recommender can be much simpler than you might think. Let’s take a look at how that works.

The goal of a recommendation engine is to present customers with opportunities that they might not otherwise find by normal browsing and searching. This is done by using historical user behavior for the entire population of users to find patterns that are then cross-referenced to the recent behavior of a specific user. Recommendations can be presented to users explicitly in the form of a list of recommended items or offers but can also be used more subtly to make a user’s overall experience more relevant to what they want to do. As an example, a “What’s New” page could literally just show new items in reverse chronological order of introduction, or it could show all items introduced recently ordered by a recommendation engine. The latter approach tends to engage users more strongly.

There are two major kinds of recommendation systems that are commonly used in production. One is based on machine learning, typically using an algorithm called alternating least squares. This system tends to be more complex and deploying the recommendations requires a specialized recommender engine. The other major kind is based solely on counting how many times different items co-occur in users’ activity histories. This result of this co-occurrence analysis is then inserted into a conventional search engine, often a preexisting one for deployment of recommendations in production. The second approach is much simpler than the first and the difference in performance (if any) is typically not very significant.

Recommendation systems of this type work by reading large amounts of historical data and doing a large analysis. This analysis is typically run as a batch or offline process because it can take tens of minutes to hours to run. The output of the analysis consists of so-called recommendation indicators and is transferred to a system that can match these indicators to recent behavior of a specific user to make recommendations in real time as soon as new behavior is observed. You can implement the system that makes these real-time recommendations using pretty much any conventional search engine. This implementation choice is very convenient because search engines are often already being used. Another advantage of this design for recommendation is that the more computationally expensive and time-consuming part of the project—building and training the recommendation model—is done offline, ahead of time, allowing recommendations to be made for users in real time, online, as outlined in Figure 5-8.

The beauty of this two-part design for a recommendation engine is that by dividing the computation of recommendations into two parts, most of the computation can be done offline. That offline computation prepares information called indicators that a standard search engine can use to deliver customized recommendations in real time. The indicators all recommendations for users to be created in real time.
Figure 5-8. The beauty of this two-part design for a recommendation engine is that by dividing the computation of recommendations into two parts, most of the computation can be done offline. That offline computation prepares information called indicators that a standard search engine can use to deliver customized recommendations in real time. The indicators all recommendations for users to be created in real time.

The offline part of the computation is shown in Figure 5-9. User behavioral history is analyzed both for co-occurrence of behavior and for cross-occurrence. In co-occurrence, behaviors are compared like-to-like. An example might be that if you want to recommend songs to a listener, you would analyze previous song-listening behavior. To recommend books for purchase, you would analyze previous book purchases. With cross-occurrence, in contrast, you would analyze past behavior of one type to make recommendations of a different type. An example would be using past behavior consisting of reading reviews for a product to recommend purchase of that item or others. Using multiple cross-occurrences together with co-occurrence is a valuable way to improve recommender performance.

A rough structure for the offline portion of a recommendation analysis system. Historical behavior is recorded in user behavior logs. These logs are examined to generate recommendation indicators by doing co-occurrence and cross-occurrence analysis. These indicators are inserted into a search engine together with conventional item metadata that would normally have been in the search engine.
Figure 5-9. A rough structure for the offline portion of a recommendation analysis system. Historical behavior is recorded in user behavior logs. These logs are examined to generate recommendation indicators by doing co-occurrence and cross-occurrence analysis. These indicators are inserted into a search engine together with conventional item metadata that would normally have been in the search engine.

Note that recent research by Schelter and Celebi is outlining ways that the offline part of this computation can be done incrementally, possibly even in strict real time. This would allow a percolation pattern to be used to update indicators in a recommendation system within seconds, even as new patterns of behavior emerge or as new content is introduced.

You can find more information on how recommendation engines are built in our previous book, Practical Machine Learning: Innovations in Recommendation (O’Reilly, 2014). That book provides a very short introduction into how to build a recommendation engine and describes the theory and basic practice.

Marketing Optimization

The goal of Marketing Optimization is to understand what causes customers to ultimately buy products across both marketing and sales cycles. In very few businesses, the marketing that get customers to engage with a company and the sales process that ensues are relatively simple. An example might be a web-only company that has only a few online marketing programs. In contrast, many businesses are at the other extreme and have a large number of marketing contacts with customers, and the sales process consists of many interactions, as well. For businesses with anything but the simplest sales cycles, determining which actions actually help sell things to customers and which things either don’t help or even impede sales is both difficult and very important. In some cases, a company has enough products that just deciding which products to talk about at which times can make a significant difference to the business.

The best practice for this problem is to first establish as complete a history of interactions with customers as possible. Typically, this takes the form of some kind of Customer 360 database. The simplest marketing optimization system and usually the first one implemented is a recommendation system of some kind. The goal here is to recognize which customers are likely to be in a position where offering a particular product to them is likely to result in a sale or other desired response.

Recommendation systems are very common in online business, but it is unusual to integrate online and offline experiences as inputs to a recommender, and it is unusual to drive recommendations uniformly to both online and offline customer interactions.

The next step in complexity beyond an indicator-based recommendation system is to build per-product sales models. These models can use behavioral features, including recommendation indicators and detailed timing of past transactions and marketing efforts, to attempt to guide the direct sales process by determining which products have the highest propensity to sell if pitched. These models are more complex than the models implicit in a normal recommender, and building them is likely to take a considerable amount of computation, but for complex sales cycles, the results can be very significant. The level of effort to build these models, however, is substantial and should only be undertaken if the product line and sales cycle justify the additional complexity. Simpler search engine–based recommenders are much more appropriate for many companies, including most business-to-consumer companies, both because the sales cycle tends to be simpler, but also because spending time on complex machine learning development is probably worth it only if there is sufficient leverage to reward the development. Extreme sales volume is one way to provide this leverage; high per-unit net profit is another way. For companies that don’t have these factors, it is often much more valuable to spend time adding more logging to user interactions and tuning the user experience to better incorporate recommendations from simpler recommendation systems, instead.

Object Store

The goal of a large Object Store is to store a large number of data objects that need to be accessed individually, often by name, but that are not necessarily of interest for wholesale analysis. This has wide utility in a number of businesses. One common use case is in financial companies where all emails, text messages, phone calls, and even recordings of meetings may be recorded this way and retained for potential use in proving regulatory compliance. In such a case, it is important for all of the saved objects to be accessible via standard file APIs so that standard software for, say, voice-to-text conversion can be used. Another use case is for media streaming for which a large number of video or audio files would be stored in different versions in different encodings. Typically, there would be a large number of thumbnail images or other extracted information associated with each media item. Commonly, these related files would be arranged in directories, and it is very useful to have automated ways to replicate data to other clusters in other data centers and to allow web servers to read files directly.

In terms of how the objects in an object store are stored, the simplest possible implementation is to simply store all objects as individual files, one per object, rather than a database. This is particularly true because large objects, often over a megabyte on average, are relatively common. Often the underlying purpose for a large object store is to provide access to media such as videos or audio recordings; sometimes the objects have to do with messaging systems or systems data. Typically, the number of objects is in the tens of millions to tens of billions, and the sizes of the objects involved range from tens of kilobytes to hundreds of megabytes. One common requirement is to make objects accessible on the web. Downtime is typically unacceptable in these systems, and 99.9th percentile response, exclusive of transmission time, must typically be in the tens of milliseconds.

Objects in systems like this often come from a very large number of internet-connected devices. These devices are commonly the primary consumer of these objects, but large scans of objects are common requirements, as well. For instance, if you are building a video serving site on a large object store, it will occasionally be necessary to transcode files into new formats or to extract thumbnail images or run image classifiers. In media systems, the total number of files is typically much larger than the number of individual videos being served because of the requirement to have multiple encoding formats at multiple bit rates along with additional media assets like thumbnail images and preview clips. A good rule of thumb is to expect roughly 100 times more files than you have conceptual objects such as a video. Decoding audio to text is another common use.

Traditionally, large object stores have been built on top of special storage hardware at very high cost, or purpose built using a combination of databases (to store object locations) and conventional file systems at very high cost in operational complexity.

You can use Hadoop systems to create an object store, but they aren’t very good at it because they cannot store very many objects. With an HDFS-based system, a completely file-oriented implementation will work only at moderate to small scale due to the file count limit that comes from the basic architecture of HDFS. Depending on the file size distribution, you might be able to use a combination of HBase to store smaller files and HDFS to store larger files. Any HDFS-based solutions will require special software to be written to translate requests into the HDFS API.

With a MapR-based system, in contrast, you can simply use the system as a very large file system because the very large number of files and their potentially large size are not a problem for MapR-FS. Using NFS or a direct POSIX interface to allow direct access by conventional web services also works well with such a solution. Such files can also be accessed using the Amazon Web Services’ Simple Storage Service (Amazon S3) API. Any of these options works as a stable production system.

Stream of Events as a System of Record

The use of a message streaming system as a log of business events, and even as the primary system of record, is showing up in more and more businesses lately. The basic idea is that as the business progresses, you keep a history of high-level business events in a message stream. This approach has the virtue that the stream contains what amounts to a complete and relatively easily understood history of the business at all times in the past. This makes it enormously easier to replay the past, but it may not be obvious how to implement such a system well. The final result can, however, be fairly simple, however, and getting a good streaming backbone in place can really help react to business microcycles better than traditional systems.

The motive for doing this was described in Chapter 1 where we talked about measuring the performance of outgoing emails. In that example, it was clear that we need to do analyses that are not known a priori and which require detailed knowledge of events such as sending and receiving emails. In Chapter 3, we had another example with fraud control in which we needed to determine what we knew as of a moment that fraud could still be prevented. Event orientation was important then because we typically don’t know ahead of time what patterns of fraud we want to control (the thieves don’t copy us on the memos when they invent new schemes).

A publicly available example of an event-based system of record is NASDAQ, which retains all transactions that affect the order book for equities and can perform a complete replay of these events down to the exact millisecond. They even allow public access to this event stream (for a price, of course). Externally, this is advantageous because it allows regulators, brokers, and traders to see exactly how trades were executed, but the fact that a precise history of events is kept has internal benefits, as well. Most notably, the production of a complete historical log of business events makes it much easier to validate that internal processes and software perform exactly as intended. You also can run new versions against historical events to demonstrate that they also behave exactly as specified.

The precise technology used by NASDAQ is, not surprisingly, not described publicly, and implementing a fair exchange system that creates such a log of operations reliably while responding correctly to all transactions within hard real-time limits is difficult. The fact remains, however, that if you create a stream of business events, it is very easy to reuse that stream for all kinds of purposes, and those purposes can have high value to the business. It is also clear that having such a log in terms of actual business events rather than in a form that exposes the details of how internal systems are actually implemented has substantial advantages, not least because internal system details that are well hidden can be changed without external repercussions.

One of the key aspects of a stream of business events is that you can reconstruct the state of the business at any point in time. This is illustrated in Figure 5-10 in which an incoming stream of business events is used to populate a database, db1, which contains a current state of the business. A replica of the event stream is used to populate a second database, db2, which is paused at some past time presumably so that the state of the business back then can be interrogated.

You can use business events to populate databases with current estimates of the business state. Pausing the update of the database can give snapshots of historical states. Ordinary databases normally cannot give you this historical view.
Figure 5-10. You can use business events to populate databases with current estimates of the business state. Pausing the update of the database can give snapshots of historical states. Ordinary databases normally cannot give you this historical view.

But the capabilities of this approach go further. The two databases don’t even need to record the same sort of state. One might be simply counting transactions; the other might be keeping account balances.

Figure 5-11 presents more complete picture of the common design idiom for such a streaming system of record that could be used for the email, fraud, or NASDAQ use cases. The idea here is that multiple critical business systems take a responsibility for writing any business events to a stream (labeled “Events” in the figure) before they are confirmed back to the origin of the event. Moreover, all such systems undertake to only update any databases local to the system from the event stream itself. The event stream becomes the system of record, superior in authority to any of the databases. If one of the critical systems fails, it is assumed that subsequent user requests will be re-routed to any surviving systems.

Critical subsystems are systems that both use and update state, emitting business events in the process. Each such system sends all such events to a business-wide stream of record that can provide a scalable and inspectable implementation of critical business processes. You need to consider possible race conditions, but you can avoid this potential problem with good design.
Figure 5-11. Critical subsystems are systems that both use and update state, emitting business events in the process. Each such system sends all such events to a business-wide stream of record that can provide a scalable and inspectable implementation of critical business processes. You need to consider possible race conditions, but you can avoid this potential problem with good design.

For many applications, this kind of design is perfectly adequate just as it stands, say, for handling updates to a user’s profile on some web service. The reason is simply that updates for a single user occur at relatively long intervals (typically seconds) and will almost always be routed to a single critical system. Indeed, the total update rate might be so slow that only one such system need be active at a time. Even if some updates were lost due to failure of a communications or a critical system while making an update, that would not be the end of the world (or the business). Typically, the user would simply repeat the update after being notified of a timeout.

To handle critical applications that cannot have duplicate transactions, such as online banking, it is common to add a unique ID to each transaction that is processed by the system. You should add this ID as far back in the system as possible, preferably right next to the origin of the transaction. In the case that everything goes well, the transaction will be confirmed back to the origin. It is relatively simple to design the system so that transactions are confirmed only when they have succeeded and also been written successfully to the event stream.

For cases in which communications or a critical system fails before, during, or after the processing of a transaction, however, the user might be left in doubt about whether the transaction has succeeded. In such a case, after communications to any critical system is restored, the user can be prompted to inspect the recent history of transactions to verify whether the transaction in question was applied. Of course, not all business systems can be designed to allow this user-driven inspection and corrections process, but if it is possible, this kind of design can be very robust while still very simple.

It should be noted, however, that if more than one critical system accepts transactions at a time, there is a potential for self-contradictory events to be pushed into the events stream. Avoiding this in the general case can be quite difficult, but in many practical situations, you can minimize the impact of this risk. For example, if all events involve just one account, or user profile or package location, you can build a very reliable system this way. In systems for which extreme availability is important (such as with package locations coming from mobile bar code readers), you can build a very reliable system by making sure that all updates are intrinsically ordered, possibly by including an accurate time in each event. That allows events to be buffered at multiple stages, possibly redundantly, but still allows correct integration of delayed messages and events. For cases in which events are generated by devices with integrated GPS receivers, accurate times are easily available which makes this much easier.

Many businesses also have situations in which orders and stock levels need to be coordinated. This is commonly used as an example of a case for which traditional database semantics are required. As a practical example, consider a situation with customers who want to reserve seats at concerts. Suppose you have business constraints such that you don’t want to allow any customer to have more than one order pending, nor to buy more than five seats at any one concert, nor to buy seats at more than four upcoming concerts. You also don’t want any seat at any concert to be sold to more than one customer. The problem here is that you have two kinds of read-modify-write operations (one against customers, one against seats) that seem to require an atomic transaction against two kinds of state, one for customers and one for seats. Each kind of state would traditionally be stored in different database tables. Using a database this way can be a classic scaling bottleneck.

In fact, the real-world business process is anything but atomic because it involves not just a database, but customers, as well (and humans don’t support ACID transactions). Customers need to see available seats before they select them and need to select seats one by one before they commit to buying them. It takes a bit of time for them to actually complete the transaction. Abandoned purchases are also distressingly common, but these must not lock out other buyers forever. Customarily, the way that this is handled is that customers have an understanding that once they have selected some seats to purchase, they have a short period of time in which to buy them, typically a few minutes. It is also common that there is a rate limiter so that individual users are not allowed to perform actions faster than humanly possible. As such, the system can be reduced to updates to a user’s state (commit to buy) and updates to seat states (temporarily commit a seat to a user, purchase a seat, release a seat). We can implement this business process by having one service for managing user state and another for managing seat state, as shown in Figure 5-12. Each of these systems follows the pattern previously shown in Figure 5-11, in which critical systems perform updates with limited extent and each system sends all business events to a shared stream.

Cooperating services can create a good user experience without any multirow atomic updates to both user and seat reservations. This allows the user update service and the seat reservation update service to be sharded by user and seat, respectively, allowing scalability by limiting the scope of any updates. Sharding this way without global atomicity gives good scalability with relatively simple design. The user interface monitors the event stream so that appropriate updates can be made to user visible state via a mechanism such as web sockets.
Figure 5-12. Cooperating services can create a good user experience without any multirow atomic updates to both user and seat reservations. This allows the user update service and the seat reservation update service to be sharded by user and seat, respectively, allowing scalability by limiting the scope of any updates. Sharding this way without global atomicity gives good scalability with relatively simple design. The user interface monitors the event stream so that appropriate updates can be made to user visible state via a mechanism such as web sockets.

In the process of selling tickets to users, the reservation interface makes synchronous remote procedure calls (RPCs) to the user update service or to the seat reservation update service. Both services write the events to the event stream just before returning a result to the interface. These events would also be used to update the state inside the service. The states for a user or a seat reservation evolve, as shown in Figure 5-13. The user interface would signal the intent to purchase tickets to the user state interface, which would record the intent internally or deny it if a different purchase session is in progress.

The user and seat reservation state machines are driven by synchronous calls from the reservation user interface to the user and seat reservation services. All call results are sent to the event stream so that the state can be updated cleanly by any other service. The asterisk on the ui/start call to the user state indicates that the request would fail if a conflicting session were already in progress and hadn't timed out.
Figure 5-13. The user and seat reservation state machines are driven by synchronous calls from the reservation user interface to the user and seat reservation services. All call results are sent to the event stream so that the state can be updated cleanly by any other service. The asterisk on the ui/start call to the user state indicates that the request would fail if a conflicting session were already in progress and hadn’t timed out.

Similar synchronous calls to the seat reservation system would cause a seat be held or purchased or returned. A seat could only be held by a user with a valid session and a seat could only be bought if it has a valid hold that hasn’t timed out. The use of holds in this fashion has a dual purpose of matching the desired user experience and avoiding race conditions between users, seats, and the user interface.

Using time-limited reservations in this way is a classic method for factoring apart the updates to user and seat state and can be applied in a variety of situations beyond simple seat reservation systems. The practical effect of factoring transactions apart with reservations is that we can shard databases on a single entity so that no updates involve more than a single row. This allows the system to be scaled in terms of both number of entities (users and seats) and transaction rate.

Building a system like this that is reliable in a practical sense often is fairly different from designing a theoretical system. In a practical system, there are often practical bounds on how long an event can be in flight before being considered to have failed or how fast events for a single entity can happen. These bounds all have to do with real, measurable time and most theoretical descriptions of database correctness specifically disallow any consideration of timing. Using time effectively in our systems, however, can give us substantial practical advantages in terms of simplicity and scalability without compromising real-world correctness.

Deduplication of events can make similar use of such timing bounds to limit how many pending events to keep in a deduplication buffer. It should be kept in mind that the goal in a production system is to drive the probability of system failure below an acceptable limit. This typically means that the system is neither designed purely for maximum availability nor absolute consistency; rather, the system is designed to trade-off the consequences of different kinds of failure to find a business-optimal, user-friendly middle ground.

Table Transformation and Percolation

In some cases, it can be impractical to change a working system so that it uses a streaming system of record. In these situations, it can still be very useful to use a related pattern that uses a stream to directly track changes to the content of a table even though the code making those updates remains unchanged. Another situation in which this is useful is when a table is already being used by a service, but we want to allow access to some version of that table without accidentally messing up the original or imposing any load on the original table. A third situation is when we want to be able to restore any version of a database from any time or replay the evolution of the database from past times. Figure 5-14 shows the basic idea.

With changed data streaming, changes made to a master table are put into a stream (labeled Changes here). You can use this change stream to create snapshots of the table with each snapshot copy corresponding to an offset in the change stream. Clones of the master table can also be created via a transformation. You can even implement these clones by using completely different technology than the master table, but they can still be initialized using snapshots from the snapshot service.
Figure 5-14. With changed data streaming, changes made to a master table are put into a stream (labeled Changes here). You can use this change stream to create snapshots of the table with each snapshot copy corresponding to an offset in the change stream. Clones of the master table can also be created via a transformation. You can even implement these clones by using completely different technology than the master table, but they can still be initialized using snapshots from the snapshot service.

In this pattern, one table is considered the master copy of the data. Changes to this table are inserted into a stream using whatever CDC system is available for the master table. These changes are best stored in a stream so that you can back up and replay them at any time. The first consumer in the figure of these changes is a so-called snapshot service. The point of the snapshot service is to make and save copies of the master table at different points in time. Each of these snapshots includes the stream offset of the last change in the snapshot. This allows you to quickly construct a current copy of the database by copying the snapshot and replaying all changes starting from the offset specified in the snapshot table. This initialization is illustrated in Figure 5-14 with a dashed line.

One of the key advantages of this pattern is not so much to make identical copies of the table; rather, it’s to create functional clones of the master table that include transformed or selected values from the master or are stored using a different technology than the master table. For instance, the clone might include only a few columns from the master. Alternatively, the clone might be stored in a very fast technology that is not robust to failures. Such a failure would not be a huge problem because you could easily reconstitute the in-memory form of the table from snapshots. Another option would be to have the clone be heavily indexed to support, say, full-text search.

This kind of design is not as useful as keeping business-level events in a stream. The reason is that table updates tend to be on much lower level of abstraction and, as such, harder to interpret in the context of the business. In many cases, a single business event can cause multiple table updates and the updates for different business events may be intertwined. As such, it is relatively easy to translate from business events to table updates, but it is often quite difficult to reverse the process. On the other hand, the table cloning pattern is architecturally much less invasive, and thus easier to get implemented. Cloning is particularly handy when you have lots of consumers who cannot handle the full detail in the master table and only want an extract containing some rows or some columns or who want a fast materialized aggregate value. In Figure 5-14, for example, Clone 1 and Clone 2 need not have the same data at all.

The idea of selective or transformed clones of a table is particularly useful when you have a very detailed customer 360 system. In such systems the customer tables with all previous transactions are too large and detailed for most internal consumers. To simplify using such a system it can help to provide drastically simplified clones that are automatically kept synchronized with the master table.

It is also possible to adapt the table cloning pattern to implement so-called percolation computations. The basic idea is that the data inserted into the master table is only skeletal in nature. Each time such a skeleton is inserted, however, a consumer of the change stream elaborates the skeleton data and inserts additional information into the original entry, as illustrated in Figure 5-15.

With percolation, a change capture stream is used not for replication, but to trigger further changes in a table. This allows the data source to insert (or change) only the barest minimum of data in the master table. Subsequent enrichment or aggregation can be done by the percolator.
Figure 5-15. With percolation, a change capture stream is used not for replication, but to trigger further changes in a table. This allows the data source to insert (or change) only the barest minimum of data in the master table. Subsequent enrichment or aggregation can be done by the percolator.

Percolation can be very handy when you have a system that inserts data into a table, but you don’t want to have that system spend the time to compute the fully elaborated form of that data. With percolation, the basic data source can be kept very simple, and all the work of elaborating the data beyond the original form can be deferred out of the critical path. Percolation can also become a dangerous data pattern if the changes are not idempotent or if the percolated changes cause more percolation. If you find yourself building elaborate patterns of percolation, you probably will be happier with streaming system of record pattern.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.199.140