3
A Minimal Data Processing and Management System

After reading this chapter, you should be able to:

  • Use Unix tooling for data processing
  • Build a pipeline to extract information
  • Automate common data tasks
  • Use PostgreSQL for large data

Overcomplicating a problem more than it has to be is a general issue in computing. It is hard to per se “keep it simple stupid.” Experience and time are essential to formulate solutions that are good enough for the problem. In this chapter, solutions for relatively large data are developed using standard tools and open‐source software.

3.1 Problem Definition

Defining a problem and working on this through different strategies are the focus in this chapter. Let us assume we are hosting an online book store.

3.1.1 Online Book Store

Imagine a scenario where we have an online book store. The online bookstore has a shopping process where consumers directly buy books from the seller in real time. The book store has an interface for searching the books and a landing page with some of the recommended books. As the user navigates through the website, he/she can add books to the shopping basket. At any time, the user can decide to buy books and check out. If the user is not registered, the website first attempts to register the user. After that, the user can choose a shipping address. Finally, the website offers payment options.

3.1.2 User Flow Optimization

A special attention is given to the user flow optimization. We aim to have a better understanding of user behavior throughout checkout. Once some log data are obtained, the user behavior can be analyzed, and website experience can be optimized in the areas that can be improved.

Let us say we have application logs that provide timestamp, host, IP, HTTP method, unique cookie per user, path, HTTP status code, content length, request time in milliseconds, HTTP referrer, and user agent. In reality, there might be many more things than this for logging purposes related to the request metrics. Nevertheless, we cut it short to make it simple:

c02f001

3.2 Processing Large Data with Linux Commands

In this section, a problem is presented that requires access to logs files from a bunch of servers to reason about a certain problem. Standard Linux commands and additional helper commands are used to process the data in different directions.

3.2.1 Understand the Data

One of the first things to do is to understand the data. We previously explained what log files contained. However, this might not be readily available, so we have to go to the source to acquire related information. Linking to the source file is suggested where the logs are generated when writing an application that parses logs.

After studying the data, some parameters are considered that might be helpful and filter the rest of the data. In our example, all the details are necessary since it is designed this way. However, there could have been details that are not needed. Nevertheless, the server will have many other lines in the logs that are not important in this case. So, we have to skip those lines and only extract the ones needed.

3.2.2 Sample the Data

To write parsing rules, minimum data are carried from the logs from one of the servers. In our case, we have a python flask application where it logs to /data/logs directory. Imagine we have ssh access to the Linux server and read permissions on the directory and log files. The logs would be potentially zipped. Let us take a look at our directory and see what we have there:

c02f001

One thing we can do is to copy/paste some of this data to another directory so that we can work on a parser that filters the access logs from the rest of the logs. After that, a parser could be implemented that parses the logs into the format desired.

3.2.3 Building the Shell Command

We took a look in the data, sampled it, and saved it to another log file. Now, let us get rid of everything else but the request logs as we want with the following command:

c02f001

zgrep takes lines starting with request log, and cut removes the first 14 characters. These columns are parsed into a more available format like comma‐separated values (CSV). So, the following awk command are executed where we print first 10 columns and the remainder as the last column. Since a unique cookie is assigned for each user, various phases of user activity can be seen in the website. By default, the logs would be sorted by the order of events, so sorting them manually will be unnecessary:

c02f001

The next step is finding ways to optimize the website. Looking at the log data, there are many reasons that make the website less attractive. The following analysis are considered to see if the user activity gets affected by different variables:

  • Number of error counts per page view
  • The ratio of activity between browser types

In the number of error counts, the status code is used to analyze error counts. In this command, the sixth and ninth columns are first extracted. Sixth column is for Http address, and the ninth column is for status code. Later, sort command is used to sort it by the page views, followed by the use of uniq command to get counts per address and status duo. We can now sort by page views; however, this would be unnecessary since users would obviously go to certain pages more often. A better way is probably getting an overall count and compare the ratios. For example, if we get more 500s for adding items to the basket, then we might try to improve this part:

c02f001

The next thing to do is to extract browser type along with the page view. This might be useful since there might be some difficulty in serving pages for certain browser types. This would be a bit more tricky when constructing the command since user agent parsing can be troublesome. In the first awk operation, the input are converted into a comma‐separated duo, and any comma is removed from the second column. In the second one, we try to find a browser type by matching against well‐known browsers. This is a best‐effort script, and it only relatively solves the problem. The user agent parsing is not that easy, but it is believed to be good enough for this job:

c02f001

Now that we have collected some data at this point, we can write the output to a temporary file. Nevertheless, only the above commands in a subset of the data were executed when, ideally, we want to execute this against all data among all servers. In Section 3.2.4, we will see how it is done.

3.2.4 Executing the Shell Command

Two shell commands were created in the previous section, but the goal is to go even further than that. The commands work fine on their own, but executing all commands and saving the results locally are done. Once the results in the local machine are obtained, we can start merging them. The key part done in the previous section is the reduction of the size of the data by eliminating many logs, extracting data, and counting. We are able to download data locally because it got smaller. Let us see how we can execute our commands on all servers that we have our application running:

c02f001

A small script will be written that connects to each host, executes the commands previously discussed, and writes the result to a folder. It reads the hostnames from a file called hosts, but one can also feed the hostnames directly. Moreover, it zips the data because the goal is to decrease the amount of data. As a result, we get all the parsed and compiled data in one folder from all servers:

c02f001

Since all of the data is now locally available, there will be an attempt to merge all of the them into one file using cat and awk commands. The cat command outputs all of the files into one stream, and zcat uncompresses them. The first awk statement simply rewrites the values as comma‐separated values. The second awk statement counts the sum of the unique path and browser fields.

3.2.5 Analyzing the Results

All the data needed are now merged and readily available to use. We have either the path, status code, and count or the path, browser type, and count. It is interesting to see if there are relatively higher ratios between the pages concerning status code and browser type and if there are more errors for certain paths or if some pages are less frequently visited with certain browser types:

c02f001

Let us find the percentage of status codes or browser types. In the first awk statement, we remove the path and only count either browser type or status code. In the second awk statement, we simply sum all and print out percentages by dividing the count by sum:

c02f001

Now that percentages are obtained, we can search for outliers in the data. For example, the paths that have much higher error status code are compared with the paths that were called much less with a certain browser type. The awk statement sums the results by browser type, path type, and combination of the two. Later, it finds percentages for any given browser type and path. The results are sorted by percentages afterward. We can now quickly see if there is an immediately available outlier. In browser type example, it shows that we do not have hit to some paths at all for Firefox.

3.2.6 Reporting the Findings

In Section 3.2.5, we looked at the data in different dimensions and found outliers. The analysis part can become much more elaborate depending on what we want to accomplish. We only looked at outliers, but one can go even further and check relative differences between paths and browser type or status code. The next step is reporting the findings to the development team. There might be various reasons why we do not have any hit for a certain browser type such as issues with cascading style sheets (CSSs) that makes it impossible to click a link. For status code, the service backing the path might have difficulties in responding, therefore eventually ending up in 500s:

c02f001

The next step is reporting the findings to a distribution group where interested parties can get notified. We carry on our script by another awk statement that reorganizes the data with columns. The next awk statement creates an HTML table with prepared data by looping over rows. The mail statement simply takes the output from the awk statement and sends an email to the distribution list.

3.2.7 Automating the Process

The process was gone over step by step, and a brief outlook on the pages that users visit was constructed. Manually doing this every day might take time and is prone to failure. Hence, the better way to handle it is through automation. If we can run our script every day at a scheduled time, then we could easily automate the process. What is more is that the script should be potentially put to source control so that it is available for everyone and it does not get lost. Hence, here are the next steps to be taken:

  • Write an entry point script
  • Put both entry point scripts and analysis scripts to source control
  • Add a cron job to process data every day:
c02f001

Our entry point script is quite straightforward. First, we navigate to the directory of the script. Then, we check for updates in the repository. After that, we execute analysis commands one by one. Analysis scripts can also be ran in parallel, but there is no need to put more load on the web servers:

c02f001

Since scripts have been added to the source control, the next thing is to write the cron job to execute these scripts. Our simple cron job looks like the one above. Analysis scripts will run at midnight, so the results will be emailed to the interested parties in the morning every day.

Some small adjustments to our scripts need to be done so that they only process data for the previous day. Otherwise, we will end up processing all data every day. Moreover, there might be other parsing rules to be added as well. We have used a manageable example; nevertheless, the parsing can be much more laborious than the way it is handled.

3.2.8 A Brief Review

We built our data processing platform with the help of common Unix commands. Taking a moment to find the similarity between the approach we took and the MapReduce framework is necessary. Essentially, a bunch of commands were mapped to run on the web servers, and the results were reduced in another server. The processing is distributed since each command runs in another web server. What is more is that the processes could be made parallel with a couple of adjustments. It looks like we have the local server as a coordinator and web servers as slaves. The point is that the problem was dealt with in somewhat similar way and still got the results.

The solution we came up and MapReduce framework share a common pattern: divide et impera. Divide and conquer is a common approach that is used in many areas of computer science. With regard to Big Data, it is an indispensable strategy since processing so much data in one place is impossible. Throughout this book, examples of this strategy in different environments and frameworks will be shown.

3.3 Processing Large Data with PostgreSQL

In Section 3.2, a simple reporting system has been developed that informs the interested parties about several aspects of the web platform. That might be enough for some businesses or organizations, but some might need a deeper grasp. To address this problem, a solution is needed that offers a common interface to execute analysis tasks on the data. There are a few numbers of such solutions for addressing this need; however, it is believed that PostgreSQL would cater to most of the requirements. Here, PostgreSQL is used as our data processing engine for the following reasons:

  • Open source that means no need to pay for the software license
  • Basic table partitioning
  • Common table expressions
  • Window functions for aggregating data
  • Unstructured data handling
  • Leverages multiple CPUs to answer queries faster
  • Well known with a large community

In this section, we will work on the same data as in Section 3.2. It is assumed that the data is parsed. Also, some scripting will still be written to load data into PostgreSQL though there is no need to revisit all the steps since they were covered.

3.3.1 Data Modeling

Previously, the data we get from request logs were described. Here, the goal is to create a model for that data in PostgreSQL. We will leverage PostgreSQL partitions to efficiently execute queries on given ranges.

Yet, before doing so, let us quickly visit PostgreSQL partitions. PostgreSQL partitioning splits one large table into smaller physical pieces. When PostgreSQL receives a query to a partitioned table, it uses different execution plans to take full advantage of individual partitions. For example, when appropriate, PostgreSQL can scan the whole partition instead of random access. Partitions can increase the gains from indexing when a table grows large because it is easier to fit the partitioned index to memory than the whole index. Moreover, PostgreSQL partitions come in handy since their bulk operation like inserting or deleting a partition is possible, avoiding a massive lock on the table. Those operations are very common in large data sets. We may need to put new partitions for each day or hour or, on the flip side, may need to delete hourly or daily partitions for retention purposes. Last but not least, partitions can be moved to other media devices without affecting the rest:

c02f001

Let us take a look at the data model. In the table, we have ts as shorthand for timestamp as the partition column and the rest of the columns parsed from the request log. All of the columns are nullable as some of them might be missed due to numerous reasons. Note that it might be a good idea to check the counts occasionally to avoid incomplete data.

3.3.2 Copying Data

When the data were processed with Unix tooling, most of the job were done on the web server itself. Nevertheless, it is ideal to use PostgreSQL for heavy lifting in this case. Thus, we have to transfer all parsed data to the PostgreSQL server and copy the data to PostgreSQL:

c02f001

In the first part of the script above, some of the variables are initialized to be used later. In the second part of the script, the partition is dropped from two weeks ago for retention purposes, and a new partition is added for yesterday's data. In the loop, we go over each one of the hosts and copy the data from each host to the PostgreSQL server. Note that there is no need to perform the extra parsing that was done before for the user agent. Ideally, the user agent string is kept as it is since it might be valuable for different purposes. When connecting to PostgreSQL, we assumed either we have .pgpass file or our IP is whitelisted by PostgreSQL server for authentication purposes. The authentication part will not be covered here as it is out of the scope of this book.

c02f001

The data was loaded into PostgreSQL. Now, we can run our SQL queries over request log data. One important aspect is to limit queries to the partitions. Otherwise, PostgreSQL has to account for data for partitions that are not needed. This would be a common matter on queries over Big Data. Careful writing of queries is advised, and, in most of the cases, these are limited to ranges and partitions. We do not want to put an undesirable load on resources due to heavy queries.

In our example query, the view count per user are calculated by using a unique cookie. We also set max_parallel_workers_per_gather to enable faster query execution time by paralleling the work among worker processes. And that is just a start. Detailed queries can be written to understand user behavior. We can potentially connect a business intelligence tool for our table and bring other tables from the production databases to join with the log data. Nevertheless, a single PostgreSQL server might not be enough to do such complicated thing. Thus, a brief look at the multi‐server setup might be considered.

3.3.3 Sharding in PostgreSQL

Although one PostgreSQL might be enough for many workloads, it might not be sufficient for jobs that require more data. We can still stick to PostgreSQL by adding a couple of nodes to distribute data over multiple servers. Luckily, PostgreSQL has a feature called foreign data wrappers that provides a mechanism to natively shard tables across multiple PostgreSQL servers. When we run a query on a node, the foreign data wrapper feature will transiently query other nodes and return the results as if they were coming from a table in the current database.

3.3.3.1 Setting up Foreign Data Wrapper

The foreign data wrapper comes with standard PostgreSQL distribution. The wrapper postgres_fdwm is an extension in the distribution and can be enabled through the following command. Note that it has to be run by database admin:

c02f001

Once the extension is enabled, then a server can be created. In the remote servers, we expect to have a database as dbdp, user as dbdp, and schema as dbdp setup. Two servers can be created as follows:

c02f001

To enable connection from the destination server to remote servers, there is a need to map users from the destination server to the remote servers. The mapping can be done as follows:

c02f001

An easy way to finish the mapping is by importing the desired schemas from remote servers to the local server. The importing can be done as follows:

c02f001

Now, we are done with pairing databases. One can pair all databases together so that each one of them can execute the same queries without any problem. Nevertheless, a master database is selected that delegates the query execution to the other servers. It might be preferred to set up a stronger machine in terms of CPU/memory for the master server. Next, we can figure out how the sharding is done.

3.3.3.2 Sharding Data over Multiple Nodes

Partitioning is already used when implementing the copy operation for the request logs. As we have discussed earlier, it provides advantages over a traditional table with regard to large volumes of data. The next idea is to go even beyond partitioning on a single server and partition data over multiple servers. Distributed partitioning is called sharding since it involves scaling out horizontally. Sharding is required when the amount of data for the table is getting close to the capacity of a single server. Besides, we might need parallel processing on multiple servers when answering analytics or reporting queries. There is just one caveat: always filter queries by partitions. Otherwise, the queries will soon exhaust the database system:

c02f001

Assume that we have created our respective tables dbdp.request_log_2020_01 and dbdp.request_log_2020_02 in remote servers with the same definition discussed earlier. Now, a link should be created between the remote servers and the local server by treating the table on a remote server as part of the partition with the above statements. PostgreSQL also supports sub‐partitioning that means partitions can be made even smaller. Following our example, daily partitions on remote servers can be made as follows:

c02f001

With all sharding and partition, we can have a distributed PostgreSQL cluster that can be scaled out horizontally. Nevertheless, it requires additional development and maintenance to support partitions and sharding. Plugins can be used for partition management, or partition creation can be automated through scheduler. However, it would require additional effort. When it gets close to be on par with the complexity of a Big Data system, it might be a good idea to consider more scalable options.

3.4 Cost of Big Data

Running Big Data systems can be operationally very costly. Before jumping into the bandwagon, it is better to evaluate other options and to start with the most basic solution and add on to this solution until it is no longer sufficient for the company's needs. As you can see from the examples, a very basic reporting can be obtained through Unix tools. When it was not enough for thorough analysis, we proceeded to PostgreSQL solution and scaled it out horizontally. When the management of the servers gets tricky enough or the desired performance is not achieved, the Big Data solutions should be considered.

The Big Data tools require a different set of expertise that might not be common for most of the development stack. It is hard to find and retain talent with Big Data skills. Thus, the use of Big Data systems also come with the problem of expertise. One can outsource the Big Data system deployment and maintenance, and it might be a wise option. Nonetheless, both talent and outsourcing would cost an additional expense. If the company can cope with a more manageable setup, it might be worth to go on that route before going through the rabbit hole.

Turning down the use of Big Data systems is not suggested. However, I just think evaluating the requirements and choosing more comfortable alternatives if possible should be taken into consideration. Of course, if a company gains a significant competitive advantage using more diverse and near real‐time data, then it is more reasonable to invest in Big Data infrastructure.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.222.138.230