I covered the basics of the MLlib library in the previous chapter, but MLlib, at least at the time of writing this book, is more like a fast-moving target that is gaining the lead rather than a well-structured implementation that everyone uses in production or even has a consistent and tested documentation. In this situation, as people say, rather than giving you the fish, I will try to focus on well-established concepts behind the libraries and teach the process of fishing in this book in order to avoid the need to drastically modify the chapters with each new MLlib release. For better or worse, this increasingly seems to be a skill that a data scientist needs to possess.
Statistics and machine learning inherently deal with uncertainty, due to one or another reason we covered in Chapter 2, Data Pipelines and Modeling. While some datasets might be completely random, the goal here is to find trends, structure, and patterns beyond what a random number generator will provide you. The fundamental value of ML is that we can generalize these patterns and improve on at least some metrics. Let's see what basic tools are available within Scala/Spark.
In this chapter, I am covering supervised and unsupervised leaning, the two historically different approaches. Supervised learning is traditionally used when we have a specific goal to predict a label, or a specific attribute of a dataset. Unsupervised learning can be used to understand internal structure and dependencies between any attributes of a dataset, and is often used to group the records or attributes in meaningful clusters. In practice, both methods can be used to complement and aid each other.
In this chapter, we will cover the following topics:
For the purpose of this chapter, a record is an observation or measurement of one or several attributes. We assume that the observations might contain noise (or be inaccurate for one or other reason):
While we believe that there is some pattern or correlation between the attributes, the one that we are after and want to uncover, the noise is uncorrelated across either the attributes or the records. In statistical terms, we say that the values for each record are drawn from the same distribution and are independent (or i.i.d. in statistical terms). The order of records does not matter. One of the attributes, usually the first, might be designated to be the label.
Supervised learning is when the goal is to predict the label yi:
Here, N is the number of remaining attributes. In other words, the goal is to generalize the patterns so that we can predict the label by just knowing the other attributes, whether because we cannot physically get the measurement or just want to explore the structure of the dataset without having the immediate goal to predict the label.
The unsupervised learning is when we don't use the label—we just try to explore the structure and correlations to understand the dataset to, potentially, predict the label better. The number of problems in this latter category has increased recently with the emergence of learning for unstructured data and streams, each of which, I'll be covering later in the book in separate chapters.
I will demonstrate the concept of records and labels based on one of the most famous datasets in machine learning, the Iris dataset (https://archive.ics.uci.edu/ml/datasets/Iris). The Iris dataset contains 50 records for each of the three types of Iris flower, 150 lines of total five fields. Each line is a measurement of the following:
With the final field being the type of the flower (setosa, versicolor, or virginica). The classic problem is to predict the label, which, in this case, is a categorical attribute with three possible values as a function of the first four attributes:
One option would be to draw a plane in the four-dimensional space that separates all four labels. Unfortunately, as one can find out, while one of the classes is clearly separable, the remaining two are not, as shown in the following multidimensional scatterplot (we have used Data Desk software to create it):
The colors and shapes are assigned according to the following table:
Label |
Color |
Shape |
---|---|---|
Iris setosa |
Blue |
x |
Iris versicolor |
Green |
Vertical bar |
Iris virginica |
Purple |
Horizontal bar |
The Iris setosa is separable because it happens to have a very short petal length and width compared to the two other types.
Let's see how we can use MLlib to find that separating multidimensional plane.
The labeled datasets used to have a very special place in ML—we will discuss unsupervised learning later in the chapter, where we do not need a label, so MLlib has a special data type to represent a record with a org.apache.spark.mllib.regression.LabeledPoint
label (refer to https://spark.apache.org/docs/latest/mllib-data-types.html#labeled-point). To read the Iris dataset from a text file, we need to transform the original UCI repository file into the so-called LIBSVM text format. While there are plenty of converters from CSV to LIBSVM format, I'd like to use a simple AWK script to do the job:
awk -F, '/setosa/ {print "0 1:"$1" 2:"$2" 3:"$3" 4:"$4;}; /versicolor/ {print "1 1:"$1" 2:"$2" 3:"$3" 4:"$4;}; /virginica/ {print "1 1:"$1" 2:"$2" 3:"$3" 4:"$4;};' iris.csv > iris-libsvm.txt
Why do we need LIBSVM format?
LIBSVM is the format that many libraries use. First, LIBSVM takes only continuous attributes. While a lot of datasets in the real world contain discrete or categorical attributes, internally they are always converted to a numerical representation for efficiency reasons, even if the L1 or L2 metrics on the resulting numerical attribute does not make much sense in the unordered discrete values. Second, the LIBSVM format allows for efficient sparse data representation. While the Iris dataset is not sparse, almost all of the modern big data sources are sparse, and the format allows for efficient storage by only storing the provided values. Many modern big data key-value and traditional RDBMS databases actually do the same for efficiency reasons.
The code might be more complex for missing values, but we know that the Iris dataset is not sparse—otherwise we'd complement our code with a bunch of if statements. We mapped the last two labels to 1 for our purpose now.
Now, let's run the Linear Support Vector Machine (SVM) SVMWithSGD code from MLlib:
$ bin/spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala> import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} scala> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics scala> import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.util.MLUtils scala> val data = MLUtils.loadLibSVMFile(sc, "iris-libsvm.txt") data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[6] at map at MLUtils.scala:112 scala> val splits = data.randomSplit(Array(0.6, 0.4), seed = 123L) splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(MapPartitionsRDD[7] at randomSplit at <console>:26, MapPartitionsRDD[8] at randomSplit at <console>:26) scala> val training = splits(0).cache() training: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[7] at randomSplit at <console>:26 scala> val test = splits(1) test: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[8] at randomSplit at <console>:26 scala> val numIterations = 100 numIterations: Int = 100 scala> val model = SVMWithSGD.train(training, numIterations) model: org.apache.spark.mllib.classification.SVMModel = org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 4, numClasses = 2, threshold = 0.0 scala> model.clearThreshold() res0: model.type = org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 4, numClasses = 2, threshold = None scala> val scoreAndLabels = test.map { point => | val score = model.predict(point.features) | (score, point.label) | } scoreAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[212] at map at <console>:36 scala> val metrics = new BinaryClassificationMetrics(scoreAndLabels) metrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@692e4a35 scala> val auROC = metrics.areaUnderROC() auROC: Double = 1.0 scala> println("Area under ROC = " + auROC) Area under ROC = 1.0 scala> model.save(sc, "model") SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
So, you just run one of the most complex algorithms in the machine learning toolbox: SVM. The result is a separating plane that distinguishes Iris setosa flowers from the other two types. The model in this case is exactly the intercept and the coefficients of the plane that best separates the labels:
scala> model.intercept res5: Double = 0.0 scala> model.weights res6: org.apache.spark.mllib.linalg.Vector = [-0.2469448809675877,-1.0692729424287566,1.7500423423258127,0.8105712661836376]
If one looks under the hood, the model is stored in a parquet
file, which can be dumped using parquet-tool
:
$ parquet-tools dump model/data/part-r-00000-7a86b825-569d-4c80-8796-8ee6972fd3b1.gz.parquet … DOUBLE weights.values.array ---------------------------------------------------------------------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:3 V:-0.2469448809675877 value 2: R:1 D:3 V:-1.0692729424287566 value 3: R:1 D:3 V:1.7500423423258127 value 4: R:1 D:3 V:0.8105712661836376 DOUBLE intercept ---------------------------------------------------------------------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 1 *** value 1: R:0 D:1 V:0.0 …
The Receiver Operating Characteristic (ROC) is a common measure of the classifier to be able to correctly rank the records according to their numeric label. We will consider precision metrics in more detail in Chapter 9, NLP in Scala.
What is ROC?
ROC has emerged in signal processing with the first application to measure the accuracy of analog radars. The common measure of accuracy is area under ROC, which, shortly, is the probability of two randomly chosen points to be ranked correctly according to their labels (the 0 label should always have a lower rank than the 1 label). AUROC has a number of attractive characteristics:
Of course, separating the remaining two labels is a harder problem since the plane that separated Iris versicolor from Iris virginica does not exist: the AUROC score will be less than 1.0. However, the SVM method will find the plane that best differentiates between the latter two classes.
Logistic regression is one of the oldest classification methods. The outcome of the logistic regression is also a set of weights, which define the hyperplane, but the loss function is logistic loss instead of L2:
Logit function is a frequent choice when the label is binary (as y = +/- 1 in the above equation):
$ bin/spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala> import org.apache.spark.SparkContext import org.apache.spark.SparkContext scala> import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel} import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel} scala> import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.mllib.evaluation.MulticlassMetrics scala> import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors scala> import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.util.MLUtils scala> val data = MLUtils.loadLibSVMFile(sc, "iris-libsvm-3.txt") data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[6] at map at MLUtils.scala:112 scala> val splits = data.randomSplit(Array(0.6, 0.4)) splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(MapPartitionsRDD[7] at randomSplit at <console>:29, MapPartitionsRDD[8] at randomSplit at <console>:29) scala> val training = splits(0).cache() training: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[7] at randomSplit at <console>:29 scala> val test = splits(1) test: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[8] at randomSplit at <console>:29 scala> val model = new LogisticRegressionWithLBFGS().setNumClasses(3).run(training) model: org.apache.spark.mllib.classification.LogisticRegressionModel = org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 8, numClasses = 3, threshold = 0.5 scala> val predictionAndLabels = test.map { case LabeledPoint(label, features) => | val prediction = model.predict(features) | (prediction, label) | } predictionAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[67] at map at <console>:37 scala> val metrics = new MulticlassMetrics(predictionAndLabels) metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@6d5254f3 scala> val precision = metrics.precision precision: Double = 0.9516129032258065 scala> println("Precision = " + precision) Precision = 0.9516129032258065 scala> model.intercept res5: Double = 0.0 scala> model.weights res7: org.apache.spark.mllib.linalg.Vector = [10.644978886788556,-26.850171485157578,3.852594349297618,8.74629386938248,4.288703063075211,-31.029289381858273,9.790312529377474,22.058196856491996]
The labels in this case can be any integer in the range [0, k), where k is the total number of classes (the correct class will be determined by building multiple binary logistic regression models against the pivot class, which in this case, is the class with the 0 label) (The Elements of Statistical Learning by Trevor Hastie, Robert Tibshirani, Jerome Friedman, Springer Series in Statistics).
The accuracy metric is precision, or the percentage of records predicted correctly (which is 95% in our case).
The preceding two methods describe linear models. Unfortunately, the linear approach does not always work for complex interactions between attributes. Assume that the label looks like an exclusive OR: 0 if X ≠ Y and 1 if X = Y:
X |
Y |
Label |
---|---|---|
1 |
0 |
0 |
0 |
1 |
0 |
1 |
1 |
1 |
0 |
0 |
1 |
There is no hyperplane that can differentiate between the two labels in the XY space. Recursive split solution, where the split on each level is made on only one variable or a linear combination thereof might work a bit better in these case. Decision trees are also known to work well with sparse and interaction-rich datasets:
$ bin/spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala> import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.tree.DecisionTree scala> import org.apache.spark.mllib.tree.model.DecisionTreeModel import org.apache.spark.mllib.tree.model.DecisionTreeModel scala> import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.util.MLUtils scala> import org.apache.spark.mllib.tree.configuration.Strategy import org.apache.spark.mllib.tree.configuration.Strategy scala> import org.apache.spark.mllib.tree.configuration.Algo.Classification import org.apache.spark.mllib.tree.configuration.Algo.Classification scala> import org.apache.spark.mllib.tree.impurity.{Entropy, Gini} import org.apache.spark.mllib.tree.impurity.{Entropy, Gini} scala> val data = MLUtils.loadLibSVMFile(sc, "iris-libsvm-3.txt") data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[6] at map at MLUtils.scala:112 scala> val splits = data.randomSplit(Array(0.7, 0.3), 11L) splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(MapPartitionsRDD[7] at randomSplit at <console>:30, MapPartitionsRDD[8] at randomSplit at <console>:30) scala> val (trainingData, testData) = (splits(0), splits(1)) trainingData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[7] at randomSplit at <console>:30 testData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[8] at randomSplit at <console>:30 scala> val strategy = new Strategy(Classification, Gini, 10, 3, 10) strategy: org.apache.spark.mllib.tree.configuration.Strategy = org.apache.spark.mllib.tree.configuration.Strategy@4110e631 scala> val dt = new DecisionTree(strategy) dt: org.apache.spark.mllib.tree.DecisionTree = org.apache.spark.mllib.tree.DecisionTree@33d89052 scala> val model = dt.run(trainingData) model: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 6 with 21 nodes scala> val labelAndPreds = testData.map { point => | val prediction = model.predict(point.features) | (point.label, prediction) | } labelAndPreds: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[32] at map at <console>:36 scala> val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() testErr: Double = 0.02631578947368421 scala> println("Test Error = " + testErr) Test Error = 0.02631578947368421 scala> println("Learned classification tree model: " + model.toDebugString) Learned classification tree model: DecisionTreeModel classifier of depth 6 with 21 nodes If (feature 3 <= 0.4) Predict: 0.0 Else (feature 3 > 0.4) If (feature 3 <= 1.7) If (feature 2 <= 4.9) If (feature 0 <= 5.3) If (feature 1 <= 2.8) If (feature 2 <= 3.9) Predict: 1.0 Else (feature 2 > 3.9) Predict: 2.0 Else (feature 1 > 2.8) Predict: 0.0 Else (feature 0 > 5.3) Predict: 1.0 Else (feature 2 > 4.9) If (feature 0 <= 6.0) If (feature 1 <= 2.4) Predict: 2.0 Else (feature 1 > 2.4) Predict: 1.0 Else (feature 0 > 6.0) Predict: 2.0 Else (feature 3 > 1.7) If (feature 2 <= 4.9) If (feature 1 <= 3.0) Predict: 2.0 Else (feature 1 > 3.0) Predict: 1.0 Else (feature 2 > 4.9) Predict: 2.0 scala> model.save(sc, "dt-model") SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
As you can see, the error (misprediction) rate on hold-out 30% sample is only 2.6%. The 30% sample of 150 is only 45 records, which means we missed only 1 record from the whole test set. Certainly, the result might and will change with a different seed, and we need a more rigorous cross-validation technique to prove the accuracy of the model, but this is enough for a rough estimate of model performance.
Decision tree generalizes on regression case, that is, when the label is continuous in nature. In this case, the splitting criterion is minimization of weighted variance, as opposed to entropy gain or gini in the case of classification. I will talk more about the differences in Chapter 5, Regression and Classification.
There are a number of parameters, which can be tuned to improve the performance:
As a portfolio of stocks has better characteristics compared to individual equities, models can be combined to produce better classifiers. Usually, these methods work really well with decision trees as the training technique can be modified to produce models with large variations. One way is to train the model on random subsets of the original data or random subsets of attributes, which is called random forest. Another way is to generate a sequence of models, where misclassified instances are reweighted to get a larger weight in each subsequent iteration. It has been shown that this method has a relation to gradient descent methods in the model parameter space. While these are valid and interesting techniques, they usually require much more space in terms of model storage and are less interpretable compared to bare decision tree models. For Spark, the ensemble models are currently under development—the umbrella issue is SPARK-3703 (https://issues.apache.org/jira/browse/SPARK-3703).
3.129.249.117