© Sayan Mukhopadhyay, Pratip Samanta 2023
S. Mukhopadhyay, P. SamantaAdvanced Data Analytics Using Pythonhttps://doi.org/10.1007/978-1-4842-8005-8_7

7. Analytics at Scale

Sayan Mukhopadhyay1   and Pratip Samanta1
(1)
Kolkata, West Bengal, India
 

In recent decades, a revolutionary change has taken place in the field of analytics technology because of big data. Data is being collected from a variety of sources, so technology has been developed to analyze this data in a distributed environment, even in real time.

Hadoop

The revolution started with the development of the Hadoop framework, which has two major components, namely, MapReduce programming and the HDFS file system.

MapReduce Programming

MapReduce is a programming style inspired by functional programming to deal with large amounts of data. The programmer can process big data using MapReduce code without knowing the internals of the distributed environment. Before MapReduce, frameworks like Condor did parallel computing on distributed data. But the main advantage of MapReduce is that it is RPC based. The data does not move; on the contrary, the code jumps to different machines to process the data. In the case of big data, it is a huge savings of network bandwidth as well as computational time.

A MapReduce program has two major components: the mapper and the reducer. In the mapper, the input is split into small units. Generally, each line of input file becomes an input for each map job. The mapper processes the input and emits a key-value pair to the reducer. The reducer receives all the values for a particular key as input and processes the data for final output.

The following pseudocode is an example of counting the frequency of words in a document:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
 // key: a word
 // values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

Partitioning Function

Sometimes it is required to send a particular data set to a particular reduce job. The partitioning function solves this purpose. For example, in the previous MapReduce example, say the user wants the output to be stored in sorted order. Then he mentions the number of the reduce job 32 for 32 letters, and in the practitioner he returns 1 for the key starting with a, 2 for b, and so on. Then all the words that start with the same letters go to the same reduce job. The output will be stored in the same output file, and because MapReduce assures that the intermediate key-value pairs are processed in increasing key order, within a given partition, the output will be stored in sorted order.

Combiner Function

The combiner is a facility in MapReduce where partial aggregation is done in the map phase. Not only does it increase the performance, but sometimes it is essential to use if the data set so huge that the reducer is throwing a stack overflow exception. Usually the reducer and combiner logic are the same, but this might be necessary depending on how MapReduce deals with the output.

To implement this word count example, we will follow a particular design pattern. There will be a root RootBDAS (BDAS stands for Big Data Analytic System) class that has two abstract methods: a mapper task and a reducer task. All child classes implement these mapper and reducer tasks. The main class will create an instance of the child class using reflection, and in MapReduce map functions call the mapper task of the instance and the reducer function of the reducer task. The major advantages of this pattern are that you can do unit testing of the MapReduce functionality and that it is adaptive. Any new child class addition does not require any changes in the main class or unit testing. You just have to change the configuration. Some code may need to implement combiner or partitioner logics. They have to inherit the ICombiner or IPartitioner interface.

Figure 7-1 shows a class diagram of the system.

A flow diagram illustrates how the main class, unit tester, and child 1 and 2 are linked to classes of abstract and interfaces.

Figure 7-1

The class diagram

Here is the RootBDAS class:
import java.util.ArrayList;
import java.util.HashMap;
/**
 *
 */
/**
 * @author SayanM
 *
 */
public abstract class RootBDAS {
       abstract  HashMap<String, ArrayList<String>>  mapper_task(String line);
       abstract  HashMap<String, ArrayList<String>>  reducer_task(String key, ArrayList<String> values);
}
Here is the child class:
import java.util.ArrayList;
import java.util.HashMap;
/**
 *
 */
/**
 * @author SayanM
 *
 */
public final class WordCounterBDAS extends RootBDAS{
       @Override
       HashMap<String, ArrayList<String>> mapper_task(String line) {
              // TODO Auto-generated method stub
              String[] words = line.split(" ");
              HashMap<String, ArrayList<String>> result = new HashMap<String, ArrayList<String>>();
              for(String w : words)
              {
                    if(result.containsKey(w))
                    {
                           ArrayList<String> vals = result.get(w);
                           vals.add("1");
                           result.put(w, vals);
                    }
                    else
                    {
                           ArrayList<String> vals = new ArrayList<String>();
                           vals.add("1");
                           result.put(w, vals);
                    }
              }
              return result;
       }
       @Override
       HashMap<String, ArrayList<String>> reducer_task(String key, ArrayList<String> values) {
              // TODO Auto-generated method stub
              HashMap<String, ArrayList<String>> result = new HashMap<String, ArrayList<String>>();
              ArrayList<String> tempres = new ArrayList<String>();
              tempres.add(values.size()+ "");
              result.put(key, tempres);
              return result;
       }
}
Here is the WordCounterBDAS utility class:
import java.util.ArrayList;
import java.util.HashMap;
/**
 *
 */
/**
 * @author SayanM
 *
 */
public final class WordCounterBDAS extends RootBDAS{
       @Override
       HashMap<String, ArrayList<String>> mapper_task(String line) {
              // TODO Auto-generated method stub
              String[] words = line.split(" ");
              HashMap<String, ArrayList<String>> result = new HashMap<String, ArrayList<String>>();
              for(String w : words)
              {
                    if(result.containsKey(w))
                    {
                           ArrayList<String> vals = result.get(w);
                           vals.add("1");
                           result.put(w, vals);
                    }
                    else
                    {
                           ArrayList<String> vals = new ArrayList<String>();
                           vals.add("1");
                           result.put(w, vals);
                     }
              }
              return result;
       }
       @Override
       HashMap<String, ArrayList<String>> reducer_task(String key, ArrayList<String> values) {
              // TODO Auto-generated method stub
       HashMap<String, ArrayList<String>> result = new HashMap<String, ArrayList<String>>();
              ArrayList<String> tempres = new ArrayList<String>();
              tempres.add(values.size()+ "");
              result.put(key, tempres);
              return result;
       }
}
Here is the MainBDAS class:
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
 *
 */
/**
 * @author SayanM
 *
 */
public class MainBDAS {
       public static class MapperBDAS extends Mapper<LongWritable, Text, Text, Text> {
              protected void map(LongWritable key, Text value, Context context)
                           throws IOException, InterruptedException {
                     String classname = context.getConfiguration().get("classname");
                     try {
                           RootBDAS instance = (RootBDAS) Class.forName(classname).getConstructor().newInstance();
                           String line = value.toString();
                           HashMap<String, ArrayList<String>> result = instance.mapper_task(line);
                           for(String k : result.keySet())
                           {
                                  for(String v : result.get(k))
                                  {
                                        context.write(new Text(k), new Text(v));
                                  }
                           }
                     } catch (Exception e) {
                           // TODO Auto-generated catch block
                           e.printStackTrace();
                    }
                    }
       }
       public static class ReducerBDAS extendsReducer<Text, Text, Text, Text> {
              protected void reduce(Text key, Iterable<Text> values,
                           Context context) throws IOException, InterruptedException {
                    String classname = context.getConfiguration().get("classname");
                    try {
                           RootBDAS instance = (RootBDAS) Class.forName(classname).getConstructor().newInstance();
                           ArrayList<String> vals = new ArrayList<String>();
                           for(Text v : values)
                           {
                                  vals.add(v.toString());
                           }
                           HashMap<String, ArrayList<String>> result = instance.reducer_task(key.toString(), vals);
                           for(String k : result.keySet())
                           {
                                  for(String v : result.get(k))
                                  {
                                         context.write(new Text(k), new Text(v));
                                  }
                           }
                     } catch (Exception e) {
                           // TODO Auto-generated catch block
                           e.printStackTrace();
                     }
              }
       }
       public static void main(String[] args) throws Exception {
              // TODO Auto-generated method stub
           String classname = Utility.getClassName(Utility.configpath);
              Configuration con = new Configuration();
              con.set("classname", classname);
              Job job = new Job(con);
              job.setJarByClass(MainBDAS.class);
              job.setJobName("MapReduceBDAS");
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(Text.class);
              job.setInputFormatClass(TextInputFormat.class);
              job.setOutputFormatClass(TextOutputFormat.class);
              FileInputFormat.setInputPaths(job, new Path(args[0]));
              FileOutputFormat.setOutputPath(job, new Path(args[1]));
              job.setMapperClass(MapperBDAS.class);
              job.setReducerClass(ReducerBDAS.class);
              System.out.println(job.waitForCompletion(true));
        }
}
To test the example, you can use this unit testing class:
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.HashMap;
import org.junit.Test;
public class testBDAS {
       @Test
       public void testMapper() throws Exception{
              String classname = Utility.getClassName(Utility.testconfigpath);
              RootBDAS instance = (RootBDAS) Class.forName(classname).getConstructor().newInstance();
              String line = Utility.getMapperInput(Utility.testconfigpath);
              HashMap<String, ArrayList<String>> actualresult = instance.mapper_task(line);
              HashMap<String, ArrayList<String>> expectedresult = Utility.getMapOutput(Utility.testconfigpath);
              for(String key : actualresult.keySet())
              {
                    boolean haskey = expectedresult.containsKey(key);
                    assertEquals(true, haskey);
                    ArrayList<String> actvals = actualresult.get(key);
                    for(String v : actvals)
                    {
                           boolean hasval = expectedresult.get(key).contains(v);
                           assertEquals(true, hasval);
                    }
              }
       }
       @Test
       public void testReducer(){
              fail();
       }
}
Finally, here are the interfaces:
import java.util.ArrayList;
import java.util.HashMap;
public interface ICombiner {
       HashMap<String, ArrayList<String>>  combiner_task(String key, ArrayList<String> values);
}
public interface IPartitioner {
       public int  partitioner_task(String line);
}

HDFS File System

Other than MapReduce, HDFS is the second component in the Hadoop framework. It is designed to deal with big data in a distributed environment for general-purpose low-cost hardware. HDFS is built on top of the Unix POSSIX file system with some modifications, with the goal of dealing with streaming data.

The Hadoop cluster consists of two types of host: the name node and the data node. The name node stores the metadata, controls execution, and acts like the master of the cluster. The data node does the actual execution; it acts like a slave and performs instructions sent by the name node.

MapReduce Design Pattern

MapReduce is an archetype for processing the data that resides in hundreds of computers. There are some design patterns that are common in MapReduce programming.

Summarization Pattern

In summary, the reducer creates the summary for each key (see Figure 7-2). The practitioner can be used if you want to sort the data or for any other purpose. The word count is an example of the summarizer pattern. This pattern can be used to find the minimum, maximum, and count of data or to find the average, median, and standard deviation.

A flow diagram illustrates how the mapper, partitioner, and reducer are linked through key, summary field, groups, and summary.

Figure 7-2

Details of the summarization pattern

Filtering Pattern

In MapReduce, filtering is done in a divide-and-conquer way (Figure 7-3). Each mapper job filters a subset of data, and the reducer aggregates the filtered subset and produces the final output. Generating the top N records, searching data, and sampling data are the common use cases of the filtering pattern.

A flow diagram illustrates how the four frames of input splits are connected to their respective output splits through filter mappers.

Figure 7-3

Details of the filtering pattern

Join Patterns

In MapReduce, joining (Figure 7-4) can be done on the map side or the reduce side. For the map side, the join data sets that will be joined should exist in the same cluster; otherwise, the reduce-side join is required. The join can be an outer join, inner join, or anti-join.

A flow diagram illustrates how mappers 1 and 2 are connected to a common reducer to predict the output of key, and values 1 and 2.

Figure 7-4

Details of the join pattern

The following code is an example of the reducer-side join:
package MapreduceJoin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
@SuppressWarnings("deprecation")
public class MapreduceJoin {
////////////////////////////////////////////////////////
       @SuppressWarnings("deprecation")
       public static class JoinReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text>
       {
              public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
              {
                     ArrayList<String> translist = new ArrayList<String>();
                     String secondvalue = "";
                     while (values.hasNext())
                     {
                           String currValue = values.next().toString().trim();
                           if(currValue.contains("trans:")){
                                   String[] temp = currValue.split("trans:");
                                   if(temp.length > 1)
                                         translist.add(temp[1]);
                            }
                            if(currValue.contains("sec:"))
                            {
                                   String[] temp = currValue.split("sec:");
                                   if(temp.length > 1)
                                          secondvalue = temp[1];
                             }
                      }
                      for(String trans : translist)
                      {
                            output.collect(key, new Text(trans +' ' + secondvalue));
                     }
               }
       }
       ////////////////////////////////////////////////////////
       @SuppressWarnings("deprecation")
       public static class TransactionMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
       {
             int index1 = 0;
             public void configure(JobConf job) {
                    index1 = Integer.parseInt(job.get("index1"));
             }
             public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
              {
                    String line = value.toString().trim();
                    if(line=="") return;
               String splitarray[] = line.split(" ");
               String id = splitarray[index1].trim();
               String ids = "trans:" + line;
               output.collect(new Text(id), new Text(ids));
             }
       }
       ////////////////////////////////////////////////////////
       @SuppressWarnings("deprecation")
       public static class SecondaryMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
       {
             int index2 = 0;
             public void configure(JobConf job) {
                    index2 = Integer.parseInt(job.get("index2"));
             }
             public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
             {
                    String line = value.toString().trim();
                    if(line=="") return;
               String splitarray[] = line.split(" ");
               String id = splitarray[index2].trim();
               String ids = "sec:" + line;
               output.collect(new Text(id), new Text(ids));
             }
       }
   ////////////////////////////////////////////////////////////
       @SuppressWarnings({ "deprecation", "rawtypes", "unchecked" })
       public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              // TODO Auto-generated method stub
              JobConf conf = new JobConf();
              conf.set("index1", args[3]);
              conf.set("index2", args[4]);
              conf.setReducerClass(JoinReducer.class);
              MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, (Class<? extends org.apache.hadoop.mapred.Mapper>) TransactionMapper.class);
              MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, (Class<? extends org.apache.hadoop.mapred.Mapper>) SecondaryMapper.class);
              Job job = new Job(conf);
              job.setJarByClass(MapreduceJoin.class);
              job.setJobName("MapReduceJoin");
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(Text.class);
              FileOutputFormat.setOutputPath(job, new Path(args[2]));
              System.out.println(job.waitForCompletion(true));
       }
}

A Notes on Functional Programming

MapReduce is functional programming. Functional programming is a type of programming in which all variables are immutable, making parallelization easy and avoiding race conditions. Normal Python allows functional programming.

Here is an example:
employees = [{
    'name': 'Jane',
    'salary': 90000,
    'job_title': 'developer'
}, {
    'name': 'Bill',
    'salary': 50000,
    'job_title': 'writer'
}, {
    'name': 'Kathy',
    'salary': 120000,
    'job_title': 'executive'
}, {
    'name': 'Anna',
    'salary': 100000,
    'job_title': 'developer'
}, {
    'name': 'Dennis',
    'salary': 95000,
    'job_title': 'developer'
}, {
    'name': 'Albert',
    'salary': 70000,
    'job_title': 'marketing specialist'
}]
#find the average salary of develper
developers = mean(map( lambda e: e.salary if e.job_title =='developer'))

Spark

After Hadoop, Spark is the next and latest revolution in big data technology. The major advantage of Spark is that it gives a unified interface to the entire big data stack. Previously, if you needed a SQL–like interface for big data, you would use Hive. If you needed real-time data processing, you would use Storm. If you wanted to build a machine learning model, you would use Mahout. Spark brings all these facilities under one umbrella. In addition, it enables in-memory computation of big data, which makes the processing very fast. Figure 7-5 describes all the components of Spark.

A block diagram lists the components of the spark core as spark S Q L structured data, spark streaming real-time, M Lib machine learning, Graph X, scheduler, YARN, and Mesos.

Figure 7-5

The components of Spark

Spark Core is the fundamental component of Spark. It can run on top of Hadoop or stand-alone. It abstracts the data set as a resilient distributed data set (RDD). RDD is a collection of read-only objects. Because it is read-only, there will not be any synchronization problems when it is shared with multiple parallel operations. Operations on RDD are lazy. There are two types of operations happening on RDD: transformation and action. In transformation, there is no execution happening on a data set. Spark only stores the sequence of operations as a directed acyclic graph called a lineage. When an action is called, then the actual execution takes place. After the first execution, the result is cached in memory. So, when a new execution is called, Spark makes a traversal of the lineage graph and makes maximum reuse of the previous computation, and the computation for the new operation becomes the minimum. This makes data processing very fast and also makes the data fault tolerant. If any node fails, Spark looks at the lineage graph for the data in that node and easily reproduces it.

One limitation of the Hadoop framework is that it does not have any message-passing interface in parallel computation. But there are several use cases where parallel jobs need to talk with each other. Spark achieves this using two kinds of shared variable. They are the broadcast variable and the accumulator. When one job needs to send a message to all other jobs, the job uses the broadcast variable, and when multiple jobs want to aggregate their results to one place, they use an accumulator. RDD splits its data set into a unit called a partition. Spark provides an interface to specify the partition of the data, which is very effective for future operations like join or find. The user can specify the storage type of partition in Spark. Spark has a programming interface in Python, Java, and Scala. The following code is an example of a word count program in Spark:
val conf = new SparkConf().setAppName("wiki_test") // create a spark config object
val sc = new SparkContext(conf) // Create a spark context
val data = sc.textFile("/path/to/somedir") // Read files from "somedir" into an RDD of (filename, content) pairs.
val tokens = data.flatMap(_.split(" ")) // Split each file into a list of tokens (words).
val wordFreq = tokens.map((_, 1)).reduceByKey(_ + _) // Add a count of one to each token, then sum the counts per word type.
wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10) // Get the top 10 words. Swap word and count to sort by count.
On top of Spark Core, Spark provides the following:
  • Spark SQL, which is a SQL interface through the command line or a database connector interface. It also provides a SQL interface for the Spark data frame object.

  • Spark Streaming, which enables you to process streaming data in real time.

  • MLib, a machine learning library to build analytical models on Spark data.

  • GraphX, a distributed graph processing framework.

PySpark

PySpark in a Python interface for Apache Spark. Using simple Python APIs, we can write a Spark application. All the major features of Spark, namely, Spark SQL, MLib, and the data frame, are supported by PySpark. The following are the installation steps:
  1. 1.

    First, we need Java Runtime Environment (JRE) in order to run Spark. It is recommended to install JRE from following link: https://adoptium.net/temurin/releases/?version=8. Also, you can install JRE from other distribution.

    The installation will automatically create JAVA_HOME. If it does not, we need to set JAVA_HOME as the system variable; the value should be the location of the Java installation folder such as JAVA_HOME=C:Program FilesEclipse Adoptiumjdk-8.0.345.1-hotspot.

     
  2. 2.

    Then we need to install PySpark using the command pip install pyspark.

    For this we need to have Python 3.7 and above installed. After that, we need to set env PYSPARK_PYTHON as the location of Python. Here’s an example: C:Users...ProgramsPythonPython39python.exe.

     
  3. 3.

    Also, install PyArrow by using pip install pyarrow to use the Pandas API in PySpark.

     
Now, we are set to write the code in PySpark. The CSV file is the same as the one we used earlier.
from pyspark.sql import SparkSession
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
import pyspark.pandas as ps
import numpy as np
# this is create Spark session
spark = SparkSession.builder.getOrCreate()
print(spark)
# Create series
s = ps.Series([11, np.nan, 51, 16, 82])
print(s)
# Create data frame
pandas_df = pd.DataFrame({
    'C1': [11, 22, 31],
    'C2': [12., 13., 14.],
    'C3': ['text1', 'text2', 'text3'],
    'C4': [date(2010, 1, 1), date(2011, 2, 1), date (2012, 3, 1)]
})
df = spark.createDataFrame(pandas_df)
df.show()
df.printSchema()
#Read csv in using Spark
df = spark.read.csv ('book\ch4\CC_GENERAL.csv', header=True)
df.show()
psdf = df.pandas_api()
print(psdf.describe())

Updatable Machine Learning and Spark Memory Model

In this section, we discuss a new machine learning topic: how to develop an updatable machine learning model, which is a basic requirement for a model to become scalable.

When we train a model in machine learning, we data train the model from scratch, which implies that all the information from prior training iterations is swapped out. For example, if we train a model with data from the first week of the month, it will predict based on the first week’s data, and if we train the same model object with data from the second week, it will predict based on the second week’s data. It begins to forecast based only on data from the second week of the month. In the model, there is no trace of first-week data. However, in the real world, there are many scenarios when we require the model to forecast based on first- and second-week data after training, which implies that each training iteration will update the model rather than erase prior training information.

This type of model is possible when the model is a function of X and for any two data sets N1 and N2.

X(N1) + X(N2) = X(N1+N2)

Examples of this type of function are count, max, min, and sum. Any probability-based model may be divided into sum and count and therefore converted into an updatable model. The Bayesian classifier was used as a model in our application, and the Spark code in the next is our own implementation of the classifier.

The algorithm’s goal is to determine the conditional probability that a Node(video or content) will be observed for a specific feature value.

Assume we’re calculating the score for node 10033207 with the feature identifier (device ID) and the feature value 49 (choose any device ID according to your choice).
  • Feature count = Number of records where that device ID (49) is present

  • Conditional count = Number of records with is_completed equal to 1, identifier equal to 49, and node equal to 10033207

The ratio of the conditional count and feature count will represent the actual probability, but we are not dividing this stage because the model will not be updatable.

In each execution of the following code, the following happens:
  1. 1.

    Create a user-defined function to convert the IP address to geographic locations.

     
  2. 2.

    Read the name of the target channel from the config file.

     
  3. 3.

    In a data frame, load the lines from the application server log except for the line that contains the newlog.

     
  4. 4.

    Filter the logline that includes the channel name.

     
  5. 5.

    Read important column names from the config and selecting them from the data frame.

     
  6. 6.

    Connect to MySQL and reading the threshold timestamp from the threshold table.

     
  7. 7.

    In MySQL, filter the data frame column with a timestamp more than the threshold, get the max timestamp from the selected logline, and set that as the new threshold for the next iteration.

     
  8. 8.

    The content or node IDs that are clicked are then saved in the database.

     
  9. 9.

    Store the popular contents or node in the database.

     
  10. 10.

    Each line contains the user’s next watch node, which is obtained by self-joining the user’s previous watch node.

     
  11. 11.

    Then calculate the ratio of watch time and duration of video as a score.

     
  12. 12.

    If the score is greater than the channel’s threshold defined in config, mark it 1; otherwise, mark it 0.

     
  13. 13.

    Then, as stated in Chapter 3, calculate the conditional probability of a user watching a video that is longer than the threshold in the form of a count.

     
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql.functions import when
from pyspark.sql.functions import lit
from pyspark.sql.functions import trim
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import pyspark.sql
import GeoIP
def ip_to_geo(ip):
    gi = GeoIP.open("/home/hadoop/GeoLiteCity.dat", GeoIP.GEOIP_STANDARD)
    gir = gi.record_by_name(ip)
    del gi
    if gir is not None:
        if gir['city'] is None:
            gir['city']=' '
        if gir['region_name'] is None:
            gir['region_name']=' '
        if gir['country_name'] is None:
            gir['country_name']=' '
        return gir['city'] + ',' + gir['region_name'] + ',' +  gir['country_name']
    else:
        return " , , "
geo_udf = udf(ip_to_geo,StringType())
sc = SparkContext(appName="BuildModelContent")
sqlContext = SQLContext(sc)
channel_config = sqlContext.read.json('/tmp/config_channels.json')
channels_list = channel_config.select("channels").collect()
channels = [(row.channels) for row in channels_list]
lines = sc.textFile("/tmp/log2.txt")
rows = lines.filter(lambda line: 'newlog' not in line)
config = sqlContext.read.json('/tmp/config.json', multiLine=True)
lim = config.select('limit').collect()[0]['limit']
rows = rows.filter(lambda row: any(c in row for c in channels))
rdd = rows.map(lambda x: x.split('|'))
#r_f = rdd.first()
df_log = rdd.toDF()     #filter(lambda row: len(row) == len(r_f)).toDF()
rdd.unpersist()
sqlContext.clearCache()
df_log.show()
parameter_config = sqlContext.read.json('/tmp/config_parameters.json')
parameter_config.show()
column_index_list = parameter_config.select("Index").collect()
column_name_list = parameter_config.select("Name").collect()
column_index =  [col(row.Index) for row in column_index_list]
column_name =  [(row.Name) for row in column_name_list]
print column_index
print column_name
df_log_reduced = df_log.select(column_index)
df_log.unpersist()
sqlContext.clearCache()
oldColumns = df_log_reduced.schema.names
newColumns = column_name
df_log_reduced = reduce(lambda df_log_reduced, idx: df_log_reduced.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), df_log_reduced)
df_log_reduced.show()
df_log_reduced = df_log_reduced.filter(df_log_reduced.timestamp.isNotNull())
df_log_reduced.show()
print df_log_reduced.count()
df_log_reduced = df_log_reduced.withColumn("timestamp",trim(col("timestamp")))
df_log_reduced = df_log_reduced.filter(df_log_reduced.timestamp.isNotNull())
df_log_reduced = df_log_reduced.withColumn("timestamp", df_log_reduced["timestamp"].cast(LongType()))
df_log_reduced.show()
print df_log_reduced.count()
db_config = sqlContext.read.format('csv').options(delimiter=' ').load("/tmp/dbconfig.txt")
db_config.show()
hostname = db_config.where(db_config._c0 == "hostname").select("_c1").rdd.flatMap(list).first()
username = db_config.where(db_config._c0 == "username").select("_c1").rdd.flatMap(list).first()
passwd = db_config.where(db_config._c0 == "passwd").select("_c1").rdd.flatMap(list).first()
dbname = db_config.where(db_config._c0 == "dbname").select("_c1").rdd.flatMap(list).first()
time_threshold = sqlContext.read.format('jdbc').options(url="jdbc:mysql://"+ hostname + '/' + dbname,driver='com.mysql.jdbc.Driver',dbtable='threshold',user=username,password=passwd).load()
time_threshold = time_threshold.withColumn("threshold", time_threshold["threshold"].cast(LongType()))
threshold = time_threshold.where(time_threshold.job == "BuildModelContent").select("threshold").rdd.flatMap(list).first()
print threshold
print lim
df_log_reduced = df_log_reduced[df_log_reduced.timestamp > threshold].limit(lim)
df_log_reduced.show()
print df_log_reduced.count()
status = "Regular job is continued"
if df_log_reduced.count() < lim:
    status = "Regular job is finshed"
if df_log_reduced.count() == 0:
    f = open('/home/hadoop/status.txt','w')
    f.write("Regular job is finshed")
    f.close()
    exit()
threshold = df_log_reduced.agg({"timestamp": "max"}).collect()[0]["max(timestamp)"]
print threshold
df_log_reduced = df_log_reduced.drop("timestamp")
time_threshold1 = time_threshold.toDF("job","threshold")
time_threshold.unpersist()
time_threshold1.show()
time_threshold1 = time_threshold1.withColumn("threshold", F.when(time_threshold1.job == "BuildModelContent",threshold).otherwise(F.lit(0)))
time_threshold1.show()
print time_threshold1.count()
time_threshold1.write.format('jdbc').options(url="jdbc:mysql://"+ hostname + '/' + dbname,driver='com.mysql.jdbc.Driver',dbtable='threshold1',user=username,password=passwd).mode('overwrite').save()
split_col = pyspark.sql.functions.split(df_log_reduced['Identifier'], ':')
df_log_reduced = df_log_reduced.withColumn('Identifier', split_col.getItem(3))
df_log_reduced.show()
df_log_reduced = df_log_reduced.withColumn("Node",trim(col("Node")))
click_data = df_log_reduced.select("Node","Identifier","NextNode")
click_data = click_data[ click_data.Node == ""]
click_data = click_data.drop("Node")
click_data = click_data.toDF("Identifier","Node")
#click_data.show(click_data.count(),False)
click_data.write.format('jdbc').options(url="jdbc:mysql://"+ hostname + '/' + dbname,driver='com.mysql.jdbc.Driver',dbtable='click_info',user=username,password=passwd).mode('append').save()
node_popular = df_log_reduced.select("NextNode","watch_time")
node_popular = node_popular.toDF("Node","watch_time")
node_popular = node_popular.groupby("Node").agg(F.sum("watch_time"))
node_popular.show()
node_popular.write.format('jdbc').options(url="jdbc:mysql://"+ hostname + '/' + dbname,driver='com.mysql.jdbc.Driver',dbtable='node_popular',user=username,password=passwd).mode('overwrite').save()
node_view = df_log_reduced.select("NextNode","Identifier")
node_view = node_view.toDF("Node","Identifier")
node_view = node_view.withColumn("Identifier", F.lit(1))
node_view = node_view.groupby("Node").agg(F.sum("Identifier"))
node_view.write.format('jdbc').options(url="jdbc:mysql://"+ hostname + '/' + dbname,driver='com.mysql.jdbc.Driver',dbtable='node_view',user=username,password=passwd).mode('overwrite').save()
df_log_reduced = df_log_reduced.withColumn("Node",trim(col("Node")))
df_log_reduced = df_log_reduced.withColumn("NextNode",trim(col("NextNode")))
df_log_reduced1 = df_log_reduced.select('Identifier','NextNode','Node')
df_log_reduced1 = df_log_reduced1.toDF('Identifier1','Node1', 'prevNode')
df_log_reduced = df_log_reduced.join(df_log_reduced1,((df_log_reduced['Node'] == df_log_reduced1['Node1']) & (df_log_reduced['Identifier'] == df_log_reduced1['Identifier1']) &(df_log_reduced['watch_time'] != '')),'inner')
df_log_reduced1.unpersist()
sqlContext.clearCache()
df_log_reduced = df_log_reduced.drop('Node1', 'NextNode','Identifier1')
sqlContext.clearCache()
df_log_reduced.show()
df_log = df_log_reduced.withColumn("score", (F.col("watch_time") / F.col("Duration")))
df_log_reduced.unpersist()
sqlContext.clearCache()
threshold_config = sqlContext.read.json('/tmp/config_threshold.json')
threshold_config.show()
for row in threshold_config.collect():
    df_log = df_log.withColumn("is_watched", F.when((df_log.channel_id == row.channel) & (df_log.score > row.threshold) ,F.lit(str(1))).otherwise(F.lit(str(0))))
df_log = df_log.drop('watch_time')
df_log = df_log.drop("Duration")
df_log = df_log.drop("score")
df_log = df_log.withColumn("ip",geo_udf(df_log["ip"]))
split_col = pyspark.sql.functions.split(df_log['ip'], ',')
df_log = df_log.withColumn('city', split_col.getItem(0))
df_log = df_log.withColumn('region', split_col.getItem(1))
df_log = df_log.withColumn('country', split_col.getItem(2))
df_log = df_log.drop('ip')
df_log.show()
sqlContext.clearCache()
for co in df_log.columns:
    if str(co) != 'is_watched' and str(co) != 'Node':
        feature_prob = df_log.groupby(co).count()
        feature_prob = feature_prob.withColumn('featureCount', col('count'))
        feature_prob.show()
        cond_prob = df_log.groupby(['is_watched', str(co),'Node']).count()
        cond_prob = cond_prob.withColumn('jointCount', col('count'))
        cond_prob_joined = cond_prob.join(feature_prob,str(co))
        cond_prob.unpersist()
        feature_prob.unpersist()
        sqlContext.clearCache()
        cond_prob = cond_prob_joined[cond_prob_joined.is_watched != str(0)]
        cond_prob_joined.unpersist()
        sqlContext.clearCache()
        cond_prob = cond_prob.drop('is_watched')
        cond_prob = cond_prob.withColumn('featurevalue', col(str(co)))
        cond_prob = cond_prob.drop(str(co))
        cond_prob = cond_prob.drop('count')
        cond_prob = cond_prob.withColumn('featurename', lit(str(co)))
        cond_prob.show()
        cond_prob.write.format('jdbc').options(url="jdbc:mysql://"+ hostname + '/' + dbname,driver='com.mysql.jdbc.Driver',dbtable='score_rec',user=username,password=passwd).mode('append').save()
        cond_prob.unpersist()
        sqlContext.clearCache()
df_log.unpersist()
sqlContext.clearCache()
sc.stop()
f = open('/home/hadoop/status.txt','w')
f.write(status)
f.close()
The config files in HDFS are given here:
Config_channel.jsoin
{"channels" :  "moviesbyfawesome"}
{"channels" : "gousa"}
Config_drop.json
{"drop" : "watch_time"}
{"drop" : "ip"}
{"drop" : "timestamp"}
Config_parameters.json
{"Name": "Identifier", "Index" : "_1"}
{"Name": "NextNode", "Index" : "_4"}
{"Name": "timestamp", "Index" : "_6"}
{"Name": "Node", "Index" : "_20"}
{"Name": "ContentType", "Index" : "_21"}
{"Name": "Author", "Index" : "_24"}
{"Name": "Platform-Channelname", "Index" : "_7"}
{"Name": "ip", "Index" : "_9"}
{"Name": "watch_time", "Index" : "_26"}
{"Name": "Duration", "Index" : "_25"}
{"Name": "channel_id", "Index" : "_8"}
Config_threshold.json
{"channel" : "all", "threshold" : 0.5 }
{"channel" : "236", "threshold" : 0.2}
{"channel" : "922", "threshold" : 0.6}
Dbconfig.txt
hostname 10.10.10.10
username hadoop
passwd sayan123
dbname model
The following commands will run the code:
spark-submit --packages mysql:mysql-connector-java:5.1.39 build_model_first.py --driver-memory 5g --executor-memory 10g --spark.driver.maxResultSize 5g  --spark.executor.extraJavaOptions -XX:+UseG1GC --spark.python.worker.memory 5g --spark.shuffle.memoryFraction .5 --packages com.databricks:spark-csv_2.10:1.1.0

We will discuss something crucial about the Spark memory model now. The operation is performed in memory by Spark. Using a command-line parameter, we can change the default size of execution memory. Spark can handle files that are greater than its RAM, but it must use disc memory to do it. For example, the input log file was greater than the Spark memory size, but the program successfully filters important lines from there. However, there is one exception. If you do decide to join, Spark requires the memory of both data frames. We use a join here to make our code as memory-optimized as possible by manually calling the Spark garbage collector via the unpersist function. However, it slows down the code. If you eliminate that code, the program will run faster but use more memory.

Analytics in the Cloud

Like many other fields, analytics is being impacted by the cloud. It is affected in two ways. Big cloud providers are continuously releasing machine learning APIs. So, a developer can easily write a machine learning application without worrying about the underlining algorithm. For example, Google provides APIs for computer vision, natural language, speech processing, and many more. A user can easily write code that can give the sentiment of an image of a face or voice in two or three lines of code.

The second aspect of the cloud is in the data engineering part. In Chapter 1 we gave an example of how to expose a model as a high-performance REST API using Falcon. Now if a million users are going to use it and if the load varies by much, then auto-scale is a required feature of this application. If you deploy the application in the Google App Engine or AWS Lambda, you can achieve the auto-scale feature in 15 minutes. Once the application is auto-scaled, you need to think about the database. DynamoDB from Amazon and Cloud Datastore by Google are auto-scaled databases in the cloud. If you use one of them, your application is now high performance and auto-scaled, but people around globe will access it, so the geographical distance will create extra latency or a negative impact on performance. You also have to make sure that your application is always available. Further, you need to deploy your application in three regions: Europe, Asia, and the United States (you can choose more regions if your budget permits). If you use an elastic load balancer with a geobalancing routing rule, which routes the traffic from a region to the app engine of that region, then it will be available across the globe. In geobalancing, you can mention a secondary app engine for each rule, which makes your application highly available. If a primary app engine is down, the secondary app engine will take care of things.

Figure 7-6 describes this system.

A flowchart lists the elastic load balancer. It has system instances 1, 2, and 3 with primary as Asia, U S, and Europe and secondary as U S, Europe, and Asia.

Figure 7-6

The system

In Chapter 1, I showed some example code of publishing a deep learning model as a REST API. The following code is the implementation of the same logic in a cloud environment where the other storage is replaced by a Google data store:
# Importing libraries
import falcon
from falcon_cors import CORS
import json
import pygeoip
import json
import datetime as dt
import ipaddress
import math
from concurrent.futures import *
import numpy as np
from google.cloud import datastore
# logistic function
def logit(x):
       return (np.exp(x) / (1 + np.exp(x)))
def is_visible(client_size, ad_position):
        y=height=0
        try:
                height  = int(client_size.split(',')[1])
                y = int(ad_position.split(',')[1])
        except:
                pass
        if y < height:
                return ("1")
        else:
                return ()0")
# Predictor class having all the required functions
class Predictor(object):
# constructor
       def __init__(self,domain,is_big):
              self.client = datastore.Client('sulvo-east')
              self.ctr = 'ctr_' + domain
              self.ip = "ip_" + domain
              self.scores = "score_num_" + domain
              self.probabilities = "probability_num_" + domain
              if is_big:
                    self.is_big = "is_big_num_" + domain
                    self.scores_big = "score_big_num_" + domain
                    self.probabilities_big = "probability_big_num_" + domain
              self.gi = pygeoip.GeoIP('GeoIP.dat')
              self.big = is_big
              self.domain = domain
       def get_hour(self,timestamp):
              return dt.datetime.utcfromtimestamp(timestamp / 1e3).hour
# to fetch score
       def fetch_score(self, featurename, featurevalue, kind):
              pred = 0
              try:
                     key = self.client.key(kind,featurename + "_" + featurevalue)
                     res= self.client.get(key)
                     if res is not None:
                           pred = res['score']
              except:
                     pass
              return pred
# function to calculate score
       def get_score(self, featurename, featurevalue):
              with ThreadPoolExecutor(max_workers=5) as pool:
                        future_score = pool.submit(self.fetch_score,featurename, featurevalue,self.scores)
                    future_prob = pool.submit(self.fetch_score,featurename, featurevalue,self.probabilities)
                    if self.big:
                           future_howbig = pool.submit(self.fetch_score,featurename, featurevalue,self.is_big)
                           future_predbig = pool.submit(self.fetch_score,featurename, featurevalue,self.scores_big)
                           future_probbig = pool.submit(self.fetch_score,featurename, featurevalue,self.probabilities_big)
                    pred = future_score.result()
                    prob = future_prob.result()
                    if not self.big:
                           return pred, prob
                    howbig = future_howbig.result()
                    pred_big = future_predbig.result()
                    prob_big = future_probbig.result()
                    return (howbig, pred, prob, pred_big, prob_big)
       def get_value(self, f, value):
             if f == 'visible':
                    fields = value.split("_")
                    value = is_visible(fields[0], fields[1])
                    if f == 'ip':
                    ip = str(ipaddress.IPv4Address(ipaddress.ip_address(value)))
                        geo = self.gi.country_name_by_addr(ip)
                    if self.big:
                           howbig1,pred1, prob1, pred_big1, prob_big1 = self.get_score('geo', geo)
                    else:
                           pred1, prob1 = self.get_score('geo', geo)
                    freq = '1'
                    key = self.client.key(self.ip,ip)
                    res = self.client.get(key)
                    if res is not None:
                            freq = res['ip']
                    if self.big:
                           howbig2, pred2, prob2, pred_big2, prob_big2 = self.get_score('frequency', freq)
                    else:
                           pred2, prob2 =  self.get_score('frequency', freq)
                    if self.big:
                           return ((howbig1 + howbig2), (pred1 + pred2), (prob1 + prob2), (pred_big1 + pred_big2), (prob_big1 + prob_big2))
                    else:
                           return ((pred1 + pred2), (prob1 + prob2))
              if f == 'root':
                    try:
                           res = client.get('root', value)
                           if res is not None:
                                  ctr = res['ctr']
                                  avt = res['avt']
                                  avv = res['avv']
                                  if self.big:
                                     (howbig1,pred1,prob1,pred_big1,prob_big1) = self.get_score('ctr', str(ctr))
                                     (howbig2,pred2,prob2,pred_big2,prob_big2) = self.get_score('avt', str(avt))
                                     (howbig3,pred3,prob3,pred_big3,prob_big3) = self.get_score('avv', str(avv))
                                     (howbig4,pred4,prob4,pred_big4,prob_big4) = self.get_score(f, value)
                                  else:
                                    (pred1,prob1) = self.get_score('ctr', str(ctr))
                                    (pred2,prob2) = self.get_score('avt', str(avt))
                                    (pred3,prob3) = self.get_score('avv', str(avv))
                                    (pred4,prob4) = self.get_score(f, value)
                           if self.big:
                                  return ((howbig1 + howbig2 + howbig3 + howbig4), (pred1 + pred2 + pred3 + pred4), (prob1 + prob2 + prob3 + prob4),(pred_big1 + pred_big2 + pred_big3 + pred_big4),(prob_big1 + prob_big2 + prob_big3 + prob_big4))
                           else:
                                  return ((pred1 + pred2 + pred3 + pred4), (prob1 + prob2 + prob3 + prob4))
                     except:
                           return (0,0)
                if f == 'client_time':
                   value = str(self.get_hour(int(value)))
              return self.get_score(f, value)
       def get_multiplier(self):
               key = self.client.key('multiplier_all_num', self.domain)
                res = self.client.get(key)
                high = res['high']
                low = res['low']
                if self.big:
                            key = self.client.key('multiplier_all_num', self.domain + "_big")
                    res = self.client.get(key)
                    high_big = res['high']
                    low_big = res['low']
                    return(high, low, high_big, low_big)
             return( high, low)
# function to post back to ad server
       def on_post(self, req, resp):
             if True:
                    input_json = json.loads(req.stream.read(),encoding='utf-8')
                    input_json['visible'] = input_json['client_size'] + "_" + input_json['ad_position']
                    del input_json['client_size']
                    del input_json['ad_position']
                    howbig = 0
                    pred = 0
                    prob = 0
                    pred_big = 0
                    prob_big = 0
                    worker = ThreadPoolExecutor(max_workers=1)
                    thread = worker.submit(self.get_multiplier)
                    with ThreadPoolExecutor(max_workers=8) as pool:
                           future_array = { pool.submit(self.get_value,f,input_json[f]) : f for f in input_json}
                           for future in as_completed(future_array):
                                  if self.big:
                                         howbig1, pred1, prob1,pred_big1,prob_big1 = future.result()
                                         pred = pred + pred1
                                         pred_big = pred_big + pred_big1
                                         prob = prob + prob1
                                         prob_big = prob_big + prob_big1
                                         howbig = howbig + howbig
                                  else:
                                         pred1, prob1 = future.result()
                                         pred = pred + pred1
                                         prob = prob + prob1
                    if self.big:
                           if howbig > .65:
                                  pred, prob = pred_big, prob_big
                    resp.status = falcon.HTTP_200
                      res = math.exp(pred)-1
                    if res < 0.1:
                           res = 0.1
                    if prob < 0.1 :
                           prob = 0.1
                    if prob > 0.9:
                           prob = 0.9
                    if self.big:
                           high, low, high_big, low_big = thread.result()
                           if howbig > 0.6:
                                  high = high_big
                                  low = low_big
                    else:
                           high, low = thread.result()
                    multiplier = low + (high -low)*prob
                    res = multiplier*res
                    resp.body = str(res)
              #except Exception,e:
              #     print(str(e))
              #     resp.status = falcon.HTTP_200
              #     resp.body = str("0.1")
cors = CORS(allow_all_origins=True,allow_all_methods=True,allow_all_headers=True)
wsgi_app = api = falcon.API(middleware=[cors.middleware])
f = open('publishers2.list_test')
for line in f:
       if "#" not in line:
             fields = line.strip().split(' ')
             domain = fields[0].strip()
             big = (fields[1].strip() == '1')
             p = Predictor(domain, big)
             url = '/predict/' + domain
             api.add_route(url, p)
f.close()
You can deploy this application in the Google App Engine with the following:
gcloud app deploy --prject <prject id>  --version <version no>
The function’s flow is as follows:
  1. 1.

    The ad server requests an impression prediction.

     
  2. 2.

    When the main thread receives the request predictor, it creates a new thread for each feature.

     
  3. 3.

    By sending a concurrent request to the data store, each thread calculates is big, floor, floor big, probability, and probability big for each feature. It returns 0 if the feature or value is not found in the data store.

     
  4. 4.

    If is big indicates a high-value impression, large scores are used; otherwise, general scores are used.

     
  5. 5.

    It predicts score = ∑ score for each feature.

     
  6. 6.

    It calculates the final predicted floor using the multiplier range obtained from the data store.

     
  7. 7.

    The predicted floor value is returned to the ad server in the response.

     

Internet of Things

The IoT is simply the network of interconnected devices embedded with sensors, software, network connectivity, and necessary electronics that enable them to collect and exchange data, making them responsive. The field is emerging with the rise of technology just like big data, realtime analytics frameworks, mobile communication, and intelligent programmable devices. In the IoT, you can analyze data on the server side using the techniques shown throughout the book; you can also put logic on the device side using the Raspberry Pi, which is an embedded system version of Python.

Essential Architectural Patterns for Data Scientists

Data is not an isolated entity. It must gather data from some application or system, then store it accurately in certain storage, and then construct a model on top of that model, which must then be provided as an API to connect with other systems. This API must occasionally be present throughout the world with a certain latency. So, much engineering goes into creating a successful intelligent system, and in today’s startup environment, which is a multibillion-dollar industry, an organization cannot afford to hire a large number of specialists to create a unique feature in their product. In the startup world, the data scientist must be a full-stack analytic expert. So, in the following sections, we’ll provide different scenarios with Tom, a fictitious character, to illustrate several key architectural patterns that any data scientist should be aware of.

Scenario 1: Hot Potato Anti-Pattern

Tom is employed as a data scientist to work on a real-time analytics product for an online company. So, the initial step is to gather data from his organization’s application. They use the cloud to auto-scale their storage, and they push data straight to the database from the application. In the test environment, everything appears to be in order. To ensure that there is no data loss, they use a TCP connection. When they go live, though, they do not make any changes to the main application, and it crashes. The company faces a massive loss within a half-hour, and Tom gets real-time feedback for his first step of the real-time analytic system: he is fired.

The concern now is why the main application crashes when nothing has changed. From a classic computer science perspective, this is called a busy consumer problem. Here the main application is the sender of data, and the database is the consumer. When the consumer is busy, which is a common occurrence in any database with a large number of queries running in it, it is difficult to handle the incoming data. Furthermore, because the TCP connection ensures the delivery data, the sender delivers the data again, which causes the sender to be loaded again, and this is the main application. The circumstance is analogous to passing a hot potato from one person to another, with the recipient returning the hot potato to the sender repeatedly. That’s why it is called the Hot Potato anti-pattern. Figure 7-7 explains the sequence visually.

A diagram illustrates how the sender sends the data to the receiver, which in turn, gives the unable process acknowledgment to the sender in a loop.

Figure 7-7

Sequence diagram describing Hot Potato anti-pattern

There are two elements to the situation. If the data that passes between the sender and the recipient is not required, we can use the UDP protocol, which drops the data that is unable to deliver. It’s one of the factors why UDP is used in all network monitoring protocols, such as SNMP and NetFlow. It does not require the device to be loaded to monitor. However, if the data is essential, such as in the financial industry, we must create a messaging queue between the sender and the recipient. When the receiver is unable to process data, it functions as a buffer. If the queue memory is full, the data is lost, or the load is transferred to the sender. ZMQ stands for “zero message queues,” and it’s nothing more than a UDP socket.

In cloud platforms, there are numerous readymade solutions; we go through them in depth in Chapter 2. The following Node.js code shows an example of a collector that uses Rabit-MQ and exposes it as a REST API to the sender, using Google Big Query as the receiver.

Code: Data Collector Module

var https = require("https");
var express = require('express');
var router = express.Router();
var _ = require('underscore');
var moment = require('moment-timezone');
var RedisSMQ = require('rsmq');
var RSMQWorker = require('rsmq-worker');
var rsmq_h = new RedisSMQ({host: '127.0.0.1', port: 6379, ns: 'rsmq'});
rsmq_h.createQueue({qname: 'hadoopqueue'}, function (err, resp) {
  if (err) {
    console.log(err.message);
  }
  if (resp === 1) {
    console.log('✔ Message Queue created');
  }
});
var worker_h = new RSMQWorker('hadoopqueue');
worker_h.on('message', function(data, next, id) {
  const BigQuery = require('@google-cloud/bigquery');
const bigquery = BigQuery({
    projectId: 'sayantest1979'
  });
   const dataset = bigquery.dataset('adlog');
const table = dataset.table('day1');
rows = JSON.parse(data);
table.insert(rows)
    .then((insertErrors) => {
      console.log('Inserted:');
      rows.forEach((row) => console.log(row));
    });
next();
});
worker_h.on('error', function(err, msg) {
  console.log('ERROR', err, msg.id);
});
worker_h.start();
router.post('/collect', function(req, res, next) {
  res.setHeader('Access-Control-Allow-Origin', '*');
  var data = req.body,
    dataArray = [],
     for (k in data) {
    var kv = data[k].split('|');
    var dataObj = {};
    for (i=0; i < kv.length; i++) {
      var d = kv[i].split(':');
      dataObj[d[0]] = decodeURIComponent(d[1]);
    }
    dataObj['server_time'] = moment().toDate().valueOf();
    dataObj['ip'] = require('ipware')().get_ip(req).clientIp;
    var str = JSON.stringify(dataObj, require('json-decycle').decycle());
    dataArray.push(str);
  }
  var dataString = dataArray.join(', ');
  dataString = "[ " + dataString + " ]";
  rsmq_h.sendMessage({qname: 'hadoopqueue', message: dataString + ' '}, function (err, resp) {
      resp && console.log('Message sent. ID:', resp);
    });
    res.end('OK');
});
module.exports = router;

Now, let’s explore other important architectural patterns, proxy and layering.

Scenario 2: Proxy and Layering Patterns

Tom starts working at a new company. There is no job uncertainty because the company is large. He does not take the risk of gathering the data in this case. The data is stored on a MySQL server. Tom had no prior knowledge of the database. He was passionate about learning MySQL. In his code, he writes lots of queries. The database is owned by another team, and their manager likes R&D. So every Monday, Tom receives a call informing him that the database has changed from MySQL to Mongo and subsequently from Mongo to SQL Server, requiring him to make modifications across the code. Tom is no longer unemployed, but he returns home from work every day at midnight.

Everyone seems to agree that the solution is to properly arrange the code. However, understanding the proxy and layering patterns is beneficial. Instead of utilizing a raw MySQL or Mongo connection in your code, employ a wrapper class as a proxy in the proxy pattern. Using the layering technique, divide your code into many layers, each of which uses a method exclusively from the layer below it. Database configuration should be done at the lowest levels, or core layer, in this instance. Above that is the database utility layer, which includes the database queries. Above that, there is a business entity layer that makes use of database queries. The following Python code will help you see things more clearly. Tom now knows that if there are any changes at the database level, he must investigate the core layer; if there are any changes in queries, he must investigate the database utility layer; and if there are any changes in business actors, he must investigate the entity layer. As a result, his life is now simple.

Database Core Layer

import  MySQLdb
class MysqlDbWrapper(object):
    connection = None
    cursor = None
    def __init__(self, configpath):
        f = open(configpath)
        config = {}
        for line in f:
            fields = line.strip().split(' ')
            config[fields[0]] = fields[1].strip()
        f.close()
        self.connection = MySQLdb.connect (host = config['hostname'], user = config['username'], passwd = config['passwd'], db = config['dbname'])
        self.cursor = self.connection.cursor()
    def close_mysql(self):
        self.cursor.close()
        self.connection.close()
    def get_data(self, query):
        self.cursor.execute(query)
        return self.cursor.fetchall()
------------
from pymongo import MongoClient
class MongoDbWrapper(object):
    collection = None
    client = None
    db = None
    def __init__(self,configpath):
        f = open(configpath)
        mongoconfig = {}
        for line in f:
            fields = line.strip().split('=')
            mongoconfig[fields[0]] = fields[1].strip()
        f.close()
        self.client = MongoClient('mongodb://'+mongoconfig['usr']+':'+ mongoconfig['pswd']+'@'+mongoconfig['host']+'/'+mongoconfig['db'])
        self.db = self.client[mongoconfig['db']]
        self.collection = self.db[mongoconfig['collection']]
    def close_mongo(self):
        self.client.close()
    def get_data(self):
        return self.collection.find({"_type" : "node", "status":1})
Database Utility Layer
from mongo import MongoDbWrapper
import collections
import time
class MongoUtility(object):
    mongo_instance = None
    res = None
    def __init__(self, configpath):
        self.mongo_instance = MongoDbWrapper(configpath)
    def load_data(self):
        self.res = self.mongo_instance.get_data()
        #print self.res
def shutdown(self):
        self.mongo_instance.close_mongo()
----------
from mysql import MysqlDbWrapper
class MysqlUtility(object):
    mysql_instance = None
    def __init__(self, configpath):
        self.mysql_instance = MysqlDbWrapper(configpath)
    def load_score(self):
        scores = {}
        data = self.mysql_instance.get_data("select * from category_score")
        for row in data:
            feature_name= row[1].strip()
            provider = row[0].strip()
            feature_value = row[2].strip()
            score = row[4]/row[3]
            #print feature_name, feature_value, provider, score
        return scores
    def shutdown(self):
        self.mysql_instance.close_mysql()
The final layer will be an abstraction of a recommendation system in JSON code like this:
{
"all" : {
        "block_categories" : [916105,1011724,1011726,1011727],
        "no_of_category" : 8,
        "no_of_node" : 40,
        "no_of_match" : 2,
        "favourite_weight" : 0.5,
        "default_category" : [10037913,10037914,10053952],
        "conditional_block_categories" : {
                 "rule" : [
                       {"if": {
                                "categories" : [10244361],
                                 "relation" : "or"
                        },
                        "then" : {
                                "categories" : [10244361,10057506],
                                "relation" : "or",
                                "blockOrallow" : false
                        }}]
        },
        "category_weightedge":{
                    "rule" :[
                               {
                                "origin" : "Identifier",
                                "weight" : 0.5
                                },
                                {
                                "origin" : "previousnode",
                                "weight" : 0.5
                               }]
        }
},
"250":{}
}

Thank You

We would thank you for reading this book. We hope you have enjoyed reading the book as we much as enjoyed writing it. We hope it helps you make a footprint on machine learning community. We would like to encourage you to keep practicing on different data sets and contribute to the open-source community. You can always contact us through the publisher. Thank you once again and best wishes!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.10.137