Example code – income

This section examines the Scala-based H2O Sparkling Water Deep Learning example using the previous Canadian income data source. First, the Spark (Context, Conf, MLlib, and RDD) and H2O (h2o, deeplearning, and water) classes are imported:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import hex.deeplearning.{DeepLearningModel, DeepLearning}
import hex.deeplearning.DeepLearningModel.DeepLearningParameters
import org.apache.spark.h2o._
import org.apache.spark.mllib
import org.apache.spark.mllib.feature.{IDFModel, IDF, HashingTF}
import org.apache.spark.rdd.RDD
import water.Key

Next, an application class called h2o_spark_dl2 is defined, the master URL is created, and then a configuration object is created based on this URL and the application name.

The Spark context is then created using the configuration object:

object h2o_spark_dl2  extends App
{
val sparkMaster = "spark://localhost:7077"
val appName = "Spark h2o ex1"
val conf = new SparkConf()

conf.setMaster(sparkMaster)
conf.setAppName(appName)

val sparkCxt = new SparkContext(conf)

An H2O context is created from the Spark context, and also an SQL context:

  import org.apache.spark.h2o._
implicit val h2oContext = new org.apache.spark.h2o.H2OContext(sparkCxt).start()

import h2oContext._
import org.apache.spark.sql._

implicit val sqlContext = new SQLContext(sparkCxt)

The H2O Flow user interface is started with the openFlow command:

  import sqlContext._
openFlow

The training and testing of the data files are now defined (on HDFS) using the server URL, path, and the filenames:

  val server    = "hdfs://localhost:8020"
val path = "/data/spark/h2o/"

val train_csv = server + path + "adult.train.data" // 32,562 rows
val test_csv = server + path + "adult.test.data" // 16,283 rows

The CSV-based training and testing data is loaded using the Spark SQL context's textFile method:

  val rawTrainData = sparkCxt.textFile(train_csv)
val rawTestData = sparkCxt.textFile(test_csv)

Now the schema is defined in terms of a string of attributes. Then, a schema variable is created by splitting the string using a series of StructField, based on each column. The data types are left as String, and the true value allows for the null values in the data:

 val schemaString = "age workclass fnlwgt education “ +
“educationalnum maritalstatus " + "occupation relationship race
gender “ + “capitalgain capitalloss " + hoursperweek nativecountry income"
val schema = StructType( schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, true)))

The raw CSV line training and testing data is now split by commas into columns. The data is filtered on empty lines to ensure that the last column (income) is not empty. The actual data rows are created from the fifteen (0-14) trimmed elements in the raw CSV data. Both the training and the test Datasets are processed:

  val trainRDD  = rawTrainData
.filter(!_.isEmpty)
.map(_.split(","))
.filter( rawRow => ! rawRow(14).trim.isEmpty )
.map(rawRow => Row(
rawRow(0).toString.trim, rawRow(1).toString.trim,
rawRow(2).toString.trim, rawRow(3).toString.trim,
rawRow(4).toString.trim, rawRow(5).toString.trim,
rawRow(6).toString.trim, rawRow(7).toString.trim,
rawRow(8).toString.trim, rawRow(9).toString.trim,
rawRow(10).toString.trim, rawRow(11).toString.trim,
rawRow(12).toString.trim, rawRow(13).toString.trim,
rawRow(14).toString.trim
)
)

val testRDD = rawTestData
.filter(!_.isEmpty)
.map(_.split(","))
.filter( rawRow => ! rawRow(14).trim.isEmpty )
.map(rawRow => Row(
rawRow(0).toString.trim, rawRow(1).toString.trim,
rawRow(2).toString.trim, rawRow(3).toString.trim,
rawRow(4).toString.trim, rawRow(5).toString.trim,
rawRow(6).toString.trim, rawRow(7).toString.trim,
rawRow(8).toString.trim, rawRow(9).toString.trim,
rawRow(10).toString.trim, rawRow(11).toString.trim,
rawRow(12).toString.trim, rawRow(13).toString.trim,
rawRow(14).toString.trim
)
)

Spark Schema RDD variables are now created for the training and test Datasets by applying the schema variable created previously for the data using the Spark context's applySchema method:

  val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)
val testSchemaRDD = sqlContext.applySchema(testRDD, schema)

Temporary tables are created for the training and testing data:

  trainSchemaRDD.registerTempTable("trainingTable")
testSchemaRDD.registerTempTable("testingTable")

Now, SQL is run against these temporary tables, both to filter the number of columns and to potentially limit the data. We could have added a WHERE or LIMIT clause. This is a useful approach that enables us to manipulate both the column and row-based data:

  val schemaRddTrain = sqlContext.sql(
"""SELECT
|age,workclass,education,maritalstatus,
|occupation,relationship,race,
|gender,hoursperweek,nativecountry,income
|FROM trainingTable """.stripMargin)

val schemaRddTest = sqlContext.sql(
"""SELECT
|age,workclass,education,maritalstatus,
|occupation,relationship,race,
|gender,hoursperweek,nativecountry,income
|FROM testingTable """.stripMargin)

The H2O data frames are now created from the data. The final column in each Dataset (income) is enumerated because this is the column that will form the Deep Learning label for the data. Also, enumerating this column forces the Deep Learning model to carry out classification rather than regression:

val trainFrame:DataFrame = schemaRddTrain
trainFrame.replace( trainFrame.find("income"), trainFrame.vec("income").toEnum)
trainFrame.update(null)

val testFrame:DataFrame = schemaRddTest
testFrame.replace( testFrame.find("income"), testFrame.vec("income").toEnum)
testFrame.update(null)

The enumerated results data income column is now saved so that the values in this column can be used to score the tested model prediction values:

   val testResArray = schemaRddTest.collect()
val sizeResults = testResArray.length
var resArray = new Array[Double](sizeResults)

for ( i <- 0 to ( resArray.length - 1)) {
resArray(i) = testFrame.vec("income").at(i)
}

The Deep Learning model parameters are now set up in terms of the number of epochs or iterations--the Datasets for training and validation and the label column income, which will be used to classify the data.

Also, we chose to use variable importance to determine which data columns are the most important in the data. This parameter tells the model to calculate the effect of each feature to the target variable. This way, after the training, a table with that data can be plotted. The Deep Learning model is then created:

  val dlParams = new DeepLearningParameters()

dlParams._epochs = 100
dlParams._train = trainFrame
dlParams._valid = testFrame
dlParams._response_column = 'income'
dlParams._variable_importances = true
val dl = new DeepLearning(dlParams)
val dlModel = dl.trainModel.get

The model is then scored against the test Dataset for predictions, and these income predictions are compared to the previously stored enumerated test data income values. Finally, an accuracy percentage is output from the test data:

   val testH2oPredict  = dlModel.score(schemaRddTest )('predict)
val testPredictions = toRDD[DoubleHolder](testH2oPredict)
.collect.map(_.result.getOrElse(Double.NaN))
var resAccuracy = 0
for ( i <- 0 to ( resArray.length - 1)) {
if ( resArray(i) == testPredictions(i) )
resAccuracy = resAccuracy + 1
}

println()
println( ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" )
println( ">>>>>> Model Test Accuracy = "
+ 100*resAccuracy / resArray.length + " % " )
println( ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" )
println()

In the last step, the application is stopped, the H2O functionality is terminated via a shutdown call, and then the Spark context is stopped:

   water.H2O.shutdown()
sparkCxt.stop()

println( " >>>>> Script Finished <<<<< " )

} // end application

Based upon a training Dataset of 32,000 and a test Dataset of 16,000 income records, this Deep Learning model is quite accurate. It reaches an accuracy level of 83 percent, which is impressive for a few lines of code, small Datasets, and just 100 epochs, as the run output shows:

 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> Model Test Accuracy = 83 %
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

This is of course not a very impressive number. The next steps to improve this number would include hyper-parameter tuning in order to find a better parametrization of the neural network, but in order to really outperform this number, a different neural network type has to be used, which is called a convolutional neural network (CNN). Unfortunately, H2O lacks direct support for CNNs and needs to make use of third-party integration to frameworks such as TensorFlow, Caffe, and MXNet. But Deeplearning4j, which is also covered in this chapter, supports CNNs out-of-the-box.

In the next section, we will examine some of the coding needed to process the MNIST data, even though that example could not be completed due to an H2O limitation at the time of coding.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.249.252