Naive Bayes in practice

The first step is to choose some data that will be used for classification. We have chosen some data from the UK Government data website at http://data.gov.uk/dataset/road-accidents-safety-data.

The dataset is called Road Safety - Digital Breath Test Data 2013, which downloads a zipped text file called DigitalBreathTestData2013.txt. This file contains around half a million rows. The data looks as follows:

Reason,Month,Year,WeekType,TimeBand,BreathAlcohol,AgeBand,Gender
Suspicion of Alcohol,Jan,2013,Weekday,12am-4am,75,30-39,Male
Moving Traffic Violation,Jan,2013,Weekday,12am-4am,0,20-24,Male
Road Traffic Collision,Jan,2013,Weekend,12pm-4pm,0,20-24,Female

In order to classify the data, we have modified both the column layout and the number of columns. We have simply used Excel, given the data volume. However, if our data size had been in the big data range, we would have had to run some Scala code on top of Apache Spark for ETL (Extract Transform Load). As the following commands show, the data now resides in HDFS in the directory named /data/spark/nbayes. The file name is called DigitalBreathTestData2013- MALE2.csv. The line count from the Linux wc command shows that there are 467,000 rows. Finally, the following data sample shows that we have selected the columns, Gender, Reason, WeekType, TimeBand, BreathAlcohol, and AgeBand to classify. We will try to classify on the Gender column using the other columns as features:

[hadoop@hc2nn ~]$ hdfs dfs -cat /data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv | wc -l
467054
[hadoop@hc2nn ~]$ hdfs dfs -cat /data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv | head -5
Male,Suspicion of Alcohol,Weekday,12am-4am,75,30-39
Male,Moving Traffic Violation,Weekday,12am-4am,0,20-24
Male,Suspicion of Alcohol,Weekend,4am-8am,12,40-49
Male,Suspicion of Alcohol,Weekday,12am-4am,0,50-59
Female,Road Traffic Collision,Weekend,12pm-4pm,0,20-24

The Apache Spark MLlib classification function uses a data structure called LabeledPoint, which is a general purpose data representation defined at http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.mllib.regression.LabeledPoint and https://spark.apache.org/docs/latest/mllib-data-types.html#labeled-point.

This structure only accepts double values, which means that the text values in the previous data need to be classified numerically. Luckily, all of the columns in the data will convert to numeric categories, and we have provided a program in the software package with this book under the chapter2 aive bayes directory to do just that. It is called convert.scala. It takes the contents of the DigitalBreathTestData2013- MALE2.csv file and converts each record into a double vector.

The directory structure and files for an sbt Scala-based development environment have already been described earlier. We are developing our Scala code on the Linux server using the Linux account, Hadoop. Next, the Linux pwd and ls commands show our top-level nbayes development directory with the bayes.sbt configuration file, whose contents have already been examined:

[hadoop@hc2nn nbayes]$ pwd
/home/hadoop/spark/nbayes
[hadoop@hc2nn nbayes]$ ls
bayes.sbt target project src

The Scala code to run the Naive Bayes example is in the src/main/scala subdirectory under the nbayes directory:

[hadoop@hc2nn scala]$ pwd
/home/hadoop/spark/nbayes/src/main/scala
[hadoop@hc2nn scala]$ ls
bayes1.scala convert.scala

We will examine the bayes1.scala file later, but first, the text-based data on HDFS must be converted into numeric double values. This is where the convert.scala file is used. The code is as follows:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

These lines import classes for the Spark context, the connection to the Apache Spark cluster, and the Spark configuration. The object that is being created is called convert1. It is an application as it extends the App class:

object convert1 extends App
{

The next line creates a function called enumerateCsvRecord. It has a parameter called colData, which is an array of Strings and returns String:

def enumerateCsvRecord( colData:Array[String]): String =
{

The function then enumerates the text values in each column, so, for instance, Male becomes 0. These numeric values are stored in values such as colVal1:

   val colVal1 =
colData(0) match
{
case "Male" => 0
case "Female" => 1
case "Unknown" => 2
case _ => 99
}

val colVal2 =
colData(1) match
{
case "Moving Traffic Violation" => 0
case "Other" => 1
case "Road Traffic Collision" => 2
case "Suspicion of Alcohol" => 3
case _ => 99
}

val colVal3 =
colData(2) match
{
case "Weekday" => 0
case "Weekend" => 0
case _ => 99
}

val colVal4 =
colData(3) match
{
case "12am-4am" => 0
case "4am-8am" => 1
case "8am-12pm" => 2
case "12pm-4pm" => 3
case "4pm-8pm" => 4
case "8pm-12pm" => 5
case _ => 99
}

val colVal5 = colData(4)
val colVal6 =
colData(5) match
{
case "16-19" => 0
case "20-24" => 1
case "25-29" => 2
case "30-39" => 3
case "40-49" => 4
case "50-59" => 5
case "60-69" => 6
case "70-98" => 7
case "Other" => 8
case _ => 99
}
A comma-separated string called lineString is created from the numeric column values and is then returned. The function closes with the final brace character. Note that the data line created next starts with a label value at column one and is followed by a vector, which represents the data.

The vector is space-separated while the label is separated from the vector by a comma. Using these two separator types allows us to process both--the label and vector--in two simple steps:

   val lineString = colVal1+","+colVal2+" "+colVal3+" "+colVal4+" "+colVal5+" "+colVal6
return lineString
}

The main script defines the HDFS server name and path. It defines the input file and the output path in terms of these values. It uses the Spark URL and application name to create a new configuration. It then creates a new context or connection to Spark using these details:

val hdfsServer = "hdfs://localhost:8020"
val hdfsPath = "/data/spark/nbayes/"
val inDataFile = hdfsServer + hdfsPath + "DigitalBreathTestData2013-MALE2.csv"
val outDataFile = hdfsServer + hdfsPath + "result"
val sparkMaster = "spark://localhost:7077"
val appName = "Convert 1"
val sparkConf = new SparkConf()
sparkConf.setMaster(sparkMaster)
sparkConf.setAppName(appName)
val sparkCxt = new SparkContext(sparkConf)

The CSV-based raw data file is loaded from HDFS using the Spark context textFile method. Then, a data row count is printed:

val csvData = sparkCxt.textFile(inDataFile)
println("Records in : "+ csvData.count() )

The CSV raw data is passed line by line to the enumerateCsvRecord function. The returned string-based numeric data is stored in the enumRddData variable:

 val enumRddData = csvData.map
{
csvLine =>
val colData = csvLine.split(',')
enumerateCsvRecord(colData)
}

Finally, the number of records in the enumRddData variable is printed, and the enumerated data is saved to HDFS:

 println("Records out : "+ enumRddData.count() )
enumRddData.saveAsTextFile(outDataFile)
} // end object

In order to run this script as an application against Spark, it must be compiled. This is carried out with the sbt package command, which also compiles the code. The following command is run from the nbayes directory:

[hadoop@hc2nn nbayes]$ sbt package
Loading /usr/share/sbt/bin/sbt-launch-lib.bash
....
[info] Done packaging.
[success] Total time: 37 s, completed Feb 19, 2015 1:23:55 PM

This causes the compiled classes that are created to be packaged into a JAR library, as shown here:

[hadoop@hc2nn nbayes]$ pwd
/home/hadoop/spark/nbayes
[hadoop@hc2nn nbayes]$ ls -l target/scala-2.10
total 24
drwxrwxr-x 2 hadoop hadoop 4096 Feb 19 13:23 classes
-rw-rw-r-- 1 hadoop hadoop 17609 Feb 19 13:23 naive-bayes_2.10-1.0.jar

The convert1 application can now be run against Spark using the application name, Spark URL, and full path to the JAR file that was created. Some extra parameters specify memory and the maximum cores that are supposed to be used:

spark-submit 
--class convert1
--master spark://localhost:7077
--executor-memory 700M
--total-executor-cores 100
/home/hadoop/spark/nbayes/target/scala-2.10/naive-bayes_2.10-1.0.jar

This creates a data directory on HDFS called /data/spark/nbayes/ followed by the result, which contains part files with the processed data:

[hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/nbayes
Found 2 items
-rw-r--r-- 3 hadoop supergroup 24645166 2015-01-29 21:27 /data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv
drwxr-xr-x - hadoop supergroup 0 2015-02-19 13:36 /data/spark/nbayes/result
[hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/nbayes/result
Found 3 items
-rw-r--r-- 3 hadoop supergroup 0 2015-02-19 13:36 /data/spark/nbayes/result/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 2828727 2015-02-19 13:36 /data/spark/nbayes/result/part-00000
-rw-r--r-- 3 hadoop supergroup 2865499 2015-02-19 13:36 /data/spark/nbayes/result/part-00001

In the following HDFS cat command, we concatenated the part file data into a file called DigitalBreathTestData2013-MALE2a.csv. We then examined the top five lines of the file using the head command to show that it is numeric. Finally, we loaded it in HDFS with the put command:

[hadoop@hc2nn nbayes]$ hdfs dfs -cat /data/spark/nbayes/result/part* > ./DigitalBreathTestData2013-MALE2a.csv 
[hadoop@hc2nn nbayes]$ head -5 DigitalBreathTestData2013-MALE2a.csv
0,3 0 0 75 3
0,0 0 0 0 1
0,3 0 1 12 4
0,3 0 0 0 5
1,2 0 3 0 1
[hadoop@hc2nn nbayes]$ hdfs dfs -put ./DigitalBreathTestData2013-MALE2a.csv /data/spark/nbayes

The following HDFS ls command now shows the numeric data file stored on HDFS in the nbayes directory:

[hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/nbayes
Found 3 items
-rw-r--r-- 3 hadoop supergroup 24645166 2015-01-29 21:27 /data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv
-rw-r--r-- 3 hadoop supergroup 5694226 2015-02-19 13:39 /data/spark/nbayes/DigitalBreathTestData2013-MALE2a.csv
drwxr-xr-x - hadoop supergroup 0 2015-02-19 13:36 /data/spark/nbayes/result

Now that the data has been converted into a numeric form, it can be processed with the MLlib Naive Bayes algorithm; this is what the Scala file, bayes1.scala, does. This file imports the same configuration and context classes as before. It also imports MLlib classes for Naive Bayes, vectors, and the LabeledPoint structure. The application class that is created this time is called bayes1:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

object bayes1 extends App {

The HDFS data file is again defined, and a Spark context is created as before:

 val hdfsServer = "hdfs://localhost:8020"
val hdfsPath = "/data/spark/nbayes/"
val dataFile = hdfsServer+hdfsPath+"DigitalBreathTestData2013-MALE2a.csv"
val sparkMaster = "spark://loclhost:7077"
val appName = "Naive Bayes 1"
val conf = new SparkConf()
conf.setMaster(sparkMaster)
conf.setAppName(appName)
val sparkCxt = new SparkContext(conf)

The raw CSV data is loaded and split by the separator characters. The first column becomes the label (Male/Female) that the data will be classified on. The final columns separated by spaces become the classification features:

 val csvData = sparkCxt.textFile(dataFile)
val ArrayData = csvData.map {
csvLine =>
val colData = csvLine.split(',')
LabeledPoint(colData(0).toDouble,
Vectors.dense(colData(1)
.split('')
.map(_.toDouble)
)
)
}

The data is then randomly divided into training (70%) and testing (30%) datasets:

 val divData = ArrayData.randomSplit(Array(0.7, 0.3), seed = 13L) 
val trainDataSet = divData(0)
val testDataSet = divData(1)

The Naive Bayes MLlib function can now be trained using the previous training set. The trained Naive Bayes model, held in the nbTrained variable, can then be used to predict the Male/Female result labels against the testing data:

 val nbTrained = NaiveBayes.train(trainDataSet)
val nbPredict = nbTrained.predict(testDataSet.map(_.features))

Given that all of the data already contained labels, the original and predicted labels for the test data can be compared. An accuracy figure can then be computed to determine how accurate the predictions were, by comparing the original labels with the prediction values:

 val predictionAndLabel = nbPredict.zip(testDataSet.map(_.label))
val accuracy = 100.0 * predictionAndLabel.filter(x => x._1 == x._2).count() /
testDataSet.count()
println( "Accuracy : " + accuracy );
}

So, this explains the Scala Naive Bayes code example. It's now time to run the compiled bayes1 application using spark-submit and determine the classification accuracy. The parameters are the same. It's just the class name that has changed:

spark-submit 
--class bayes1
--master spark://hc2nn.semtech-solutions.co.nz:7077
--executor-memory 700M
--total-executor-cores 100
/home/hadoop/spark/nbayes/target/scala-2.10/naive-bayes_2.10-1.0.jar

The resulting accuracy given by the Spark cluster is just 43 percent, which seems to imply that this data is not suitable for Naive Bayes:

Accuracy: 43.30

Luckily we'll introduce artificial neural networks later in the chapter, a more powerful classifier. In the next example, we will use K-Means to try to determine what clusters exist within the data. Remember, Naive Bayes needs the data classes to be linearly separable along the class boundaries. With K-Means, it will be possible to determine both: the membership and centroid location of the clusters within the data.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.137.75