ANN in practice

In order to begin ANN training, test data is needed. Given that this type of classification method is supposed to be good at classifying distorted or noisy images, we decided to attempt to classify the images here:

They are hand-crafted text files that contain shaped blocks, created from the characters 1 and 0. When they are stored on HDFS, the carriage return characters are removed so that the image is presented as a single line vector. So, the ANN will be classifying a series of shape images and then will be tested against the same images with noise added to determine whether the classification will still work. There are six training images, and they will each be given an arbitrary training label from 0.1 to 0.6. So, if the ANN is presented with a closed square, it should return a label of 0.1. The following image shows an example of a testing image with noise added.

The noise, created by adding extra zero (0) characters within the image, has been highlighted:

As before, the ANN code is developed using the Linux Hadoop account in a subdirectory called spark/ann. The ann.sbt file exists in the ann directory:

[hadoop@hc2nn ann]$ pwd

[hadoop@hc2nn ann]$ ls
ann.sbt project src target

The contents of the ann.sbt file have been changed to use full paths of JAR library files for the Spark dependencies:

name := "A N N"
version := "1.0"
scalaVersion := "2.11.2"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.8.1"
libraryDependencies += "org.apache.spark" % "spark-core" % "2.6.0"
libraryDependencies += "org.apache.spark" % "spark-mllib" % "2.1.1"
libraryDependencies += "org.apache.spark" % "akka" % "2.5.3"

As in the previous examples, the actual Scala code to be compiled exists in a subdirectory named src/main/scala. We have created two Scala programs. The first trains using the input data and then tests the ANN model with the same input data. The second tests the trained model with noisy data to test the distorted data classification:

[hadoop@hc2nn scala]$ pwd
[hadoop@hc2nn scala]$ ls
test_ann1.scala test_ann2.scala

We will examine the first Scala file and then we will just show the extra features of the second file, as the two examples are very similar up to the point of training the ANN. The code examples shown here can be found in the software package provided with this book under the path, chapter2ANN. So, to examine the first Scala example, the import statements are similar to the previous examples. The Spark context, configuration, vectors, and LabeledPoint are being imported. The RDD class for RDD processing is being imported this time, along with the new ANN class, ANNClassifier. Note that the MLlib/classification routines widely use the LabeledPoint structure for input data, which will contain the features and labels that are supposed to be trained against:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.classification.ANNClassifier
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD

object testann1 extends App {

The application class in this example has been called testann1. The HDFS files to be processed have been defined in terms of the HDFS server, path, and file name:

 val server = "hdfs://localhost:8020"
val path = "/data/spark/ann/"

val data1 = server + path + "close_square.img"
val data2 = server + path + "close_triangle.img"
val data3 = server + path + "lines.img"
val data4 = server + path + "open_square.img"
val data5 = server + path + "open_triangle.img"
val data6 = server + path + "plus.img"

The Spark context has been created with the URL for the Spark instance, which now has a different port number--8077. The application name is ANN 1. This will appear on the Spark web UI when the application is run:

 val sparkMaster = "spark://localhost:8077"
val appName = "ANN 1"
val conf = new SparkConf()


val sparkCxt = new SparkContext(conf)

The HDFS-based input training and test data files are loaded. The values on each line are split by space characters, and the numeric values have been converted into doubles. The variables that contain this data are then stored in an array called inputs. At the same time, an array called outputs is created, containing the labels from 0.1 to 0.6. These values will be used to classify the input patterns:

 val rData1 = sparkCxt.textFile(data1).map(_.split(" ").map(_.toDouble)).collect
val rData2 = sparkCxt.textFile(data2).map(_.split(" ").map(_.toDouble)).collect
val rData3 = sparkCxt.textFile(data3).map(_.split(" ").map(_.toDouble)).collect
val rData4 = sparkCxt.textFile(data4).map(_.split(" ").map(_.toDouble)).collect
val rData5 = sparkCxt.textFile(data5).map(_.split(" ").map(_.toDouble)).collect
val rData6 = sparkCxt.textFile(data6).map(_.split(" ").map(_.toDouble)).collect
val inputs = Array[Array[Double]] (
rData1(0), rData2(0), rData3(0), rData4(0), rData5(0), rData6(0) )
val outputs = Array[Double]( 0.1, 0.2, 0.3, 0.4, 0.5, 0.6 )

The input and output data, representing the input data features and labels, are then combined and converted into a LabeledPoint structure. Finally, the data is parallelised in order to partition it for optimal parallel processing:

 val ioData = outputs )
val lpData ={ case(features,label) =>

LabeledPoint( label, Vectors.dense(features) )
val rddData = sparkCxt.parallelize( lpData )

Variables are created to define the hidden layer topology of the ANN. In this case, we have chosen to have two hidden layers, each with 100 neurons. The maximum number of iterations is defined as well as a batch size (six patterns) and convergence tolerance. The tolerance refers to how big the training error can get before we can consider training to have worked. Then, an ANN model is created using these configuration parameters and the input data:

 val hiddenTopology : Array[Int] = Array( 100, 100 )
val maxNumIterations = 1000
val convTolerance = 1e-4
val batchSize = 6
val annModel = ANNClassifier.train(rddData,

In order to test the trained ANN model, the same input training data is used as testing data to obtain prediction labels. First, an input data variable is created called rPredictData. Then, the data is partitioned and, finally, the predictions are obtained using the trained ANN model. For this model to work, it must output the labels, 0.1 to 0.6:

 val rPredictData ={ case(features) => 
( Vectors.dense(features) )
val rddPredictData = sparkCxt.parallelize( rPredictData )
val predictions = annModel.predict( rddPredictData )

The label predictions are printed and the script closes with a closing bracket:

 predictions.toArray().foreach( value => println( "prediction > " + value ) )
} // end ann1

So, in order to run this code sample, it must first be compiled and packaged. By now, you must be familiar with the sbt command, executed from the ann subdirectory:

[hadoop@hc2nn ann]$ pwd
[hadoop@hc2nn ann]$ sbt package

The spark-submit command is then used from within the new spark/spark path using the new Spark-based URL at port 8077 to run the application, testann1:

--class testann1
--master spark://localhost:8077
--executor-memory 700M
--total-executor-cores 100

By checking the Apache Spark web URL at http://localhost:19080/, it is now possible to see the application running. The following figure shows the ANN 1 application running as well as the previously completed executions:

By selecting one of the cluster host worker instances, it is possible to see a list of executors that actually carry out cluster processing for that worker:

Finally, by selecting one of the executors, it is possible to see its history and configuration as well as links to the log file and error information. At this level, with the log information provided, debugging is possible. These log files can be checked to process error messages:

The ANN 1 application provides the following output to show that it has reclassified the same input data correctly. The reclassification has been successful as each of the input patterns has been given the same label that it was trained with:

prediction > 0.1
prediction > 0.2
prediction > 0.3
prediction > 0.4
prediction > 0.5
prediction > 0.6

So, this shows that ANN training and test prediction will work with the same data. Now, we will train with the same data, but test with distorted or noisy data, an example of which we already demonstrated. This example can be found in the file called test_ann2.scala in your software package. It is very similar to the first example, so we will just demonstrate the changed code. The application is now called testann2:

object testann2 extends App

An extra set of testing data is created, after the ANN model has been created using the training data. This testing data contains noise:

 val tData1 = server + path + "close_square_test.img"
val tData2 = server + path + "close_triangle_test.img"
val tData3 = server + path + "lines_test.img"
val tData4 = server + path + "open_square_test.img"
val tData5 = server + path + "open_triangle_test.img"
val tData6 = server + path + "plus_test.img"

This data is processed into input arrays and partitioned for cluster processing:

 val rtData1 = sparkCxt.textFile(tData1).map(_.split(" ").map(_.toDouble)).collect
val rtData2 = sparkCxt.textFile(tData2).map(_.split(" ").map(_.toDouble)).collect
val rtData3 = sparkCxt.textFile(tData3).map(_.split(" ").map(_.toDouble)).collect
val rtData4 = sparkCxt.textFile(tData4).map(_.split(" ").map(_.toDouble)).collect
val rtData5 = sparkCxt.textFile(tData5).map(_.split(" ").map(_.toDouble)).collect
val rtData6 = sparkCxt.textFile(tData6).map(_.split(" ").map(_.toDouble)).collect
val tInputs = Array[Array[Double]] (
rtData1(0), rtData2(0), rtData3(0), rtData4(0), rtData5(0), rtData6(0) )

val rTestPredictData ={ case(features) => ( Vectors.dense(features) ) }
val rddTestPredictData = sparkCxt.parallelize( rTestPredictData )

It is then used to generate label predictions in the same way as the first example. If the model classifies the data correctly, then the same label values should be printed from 0.1 to 0.6:

 val testPredictions = annModel.predict( rddTestPredictData )
testPredictions.toArray().foreach( value => println( "test prediction > " + value ) )

The code has already been compiled, so it can be run using the spark-submit command:

--class testann2
--master spark://localhost:8077
--executor-memory 700M
--total-executor-cores 100

Here is the cluster output from this script, which shows a successful classification using a trained ANN model and some noisy test data. The noisy data has been classified correctly. For instance, if the trained model had become confused, it might have given a value of 0.15 for the noisy close_square_test.img test image in position one, instead of returning 0.1 as it did:

test prediction > 0.1
test prediction > 0.2
test prediction > 0.3
test prediction > 0.4
test prediction > 0.5
test prediction > 0.6
