Pipeline components

Pipelines consist of a set of components joined together such that the DataFrame produced by one component is used as input for the next component. The components available are split into two classes: transformers and estimators.

Transformers

Transformers transform one DataFrame into another, normally by appending one or more columns.

The first step in our spam classification algorithm is to split each message into an array of words. This is called tokenization. We can use the Tokenizer transformer, provided by MLlib:

scala> import org.apache.spark.ml.feature._
import org.apache.spark.ml.feature._

scala> val tokenizer = new Tokenizer()
tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_75559f60e8cf 

The behavior of transformers can be customized through getters and setters. The easiest way of obtaining a list of the parameters available is to call the .explainParams method:

scala> println(tokenizer.explainParams)
inputCol: input column name (undefined)
outputCol: output column name (default: tok_75559f60e8cf__output)

We see that the behavior of a Tokenizer instance can be customized using two parameters: inputCol and outputCol, describing the header of the column containing the input (the string to be tokenized) and the output (the array of words), respectively. We can set these parameters using the setInputCol and setOutputCol methods.

We set inputCol to "text", since that is what the column is called in our training and test DataFrames. We will set outputCol to "words":

scala> tokenizer.setInputCol("text").setOutputCol("words")
org.apache.spark.ml.feature.Tokenizer = tok_75559f60e8cf

In due course, we will integrate tokenizer into a pipeline, but, for now, let's just use it to transform the training DataFrame, to verify that it works correctly.

scala> val tokenizedDF = tokenizer.transform(trainDF)
tokenizedDF: DataFrame = [fileName: string, text: string, category: string, words: array<string>]

scala> tokenizedDF.show
+--------------+----------------+--------+--------------------+
|      fileName|            text|category|               words|
+--------------+----------------+--------+--------------------+
|file:/Users...|Subject: auto...|    spam|[subject:, auto, ...|
|file:/Users...|Subject: want...|    spam|[subject:, want, ...|
|file:/Users...|Subject: n't ...|    spam|[subject:, n't, m...|
|file:/Users...|Subject: amaz...|    spam|[subject:, amaze,...|
|file:/Users...|Subject: help...|    spam|[subject:, help, ...|
|file:/Users...|Subject: beat...|    spam|[subject:, beat, ...|
|...                                                          |
+--------------+----------------+--------+--------------------+

The tokenizer transformer produces a new DataFrame with an additional column, words, containing an array of the words in the text column.

Clearly, we can use our tokenizer to transform any DataFrame with the correct schema. We could, for instance, use it on the test set. Much of machine learning involves calling the same (or a very similar) pipeline on different data sets. By providing the pipeline abstraction, MLlib facilitates reasoning about complex machine learning algorithms consisting of many cleaning, transformation, and modeling components.

The next step in our pipeline is to calculate the frequency of occurrence of each word in each message. We will eventually use these frequencies as features in our algorithm. We will use the HashingTF transformer to transform from arrays of words to word frequency vectors for each message.

The HashingTF transformer constructs a sparse vector of word frequencies from input iterables. Each element in the word array gets transformed to a hash code. This hash code is truncated to a value between 0 and a large number n, the total number of elements in the output vector. The term frequency vector is just the number of occurrences of the truncated hash.

Let's run through an example manually to understand how this works. We will calculate the term frequency vector for Array("the", "dog", "jumped", "over", "the"). Let's set n, the number of elements in the sparse output vector, to 16 for this example. The first step is to calculate the hash code for each element in our array. We can use the built-in ## method, which calculates a hash code for any object:

scala> val words = Array("the", "dog", "jumped", "over", "the")
words: Array[String] = Array(the, dog, jumped, over, the)

scala> val hashCodes = words.map { _.## }
hashCodes: Array[Int] = Array(114801, 99644, -1148867251, 3423444, 114801)

To transform the hash codes into valid vector indices, we take the modulo of each hash by the size of the vector (16, in this case):

scala> val indices = hashCodes.map { code => Math.abs(code % 16) }
indices: Array[Int] = Array(1, 12, 3, 4, 1)

We can then create a mapping from indices to the number of times that index appears:

scala> val indexFrequency = indices.groupBy(identity).mapValues {
  _.size.toDouble
}
indexFrequency: Map[Int,Double] = Map(4 -> 1.0, 1 -> 2.0, 3 -> 1.0, 12 -> 1.0)

Finally, we can convert this map to a sparse vector, where the value at each element in the vector is the frequency with which this particular index occurs:

scala> import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg._

scala> val termFrequencies = Vectors.sparse(16, indexFrequency.toSeq)
termFrequencies: linalg.Vector = (16,[1,3,4,12],[2.0,1.0,1.0,1.0])

Note that the .toString output for a sparse vector consists of three elements: the total size of the vector, followed by two lists: the first is a series of indices, and the second is a series of values at those indices.

Using a sparse vector provides a compact and efficient way of representing the frequency of occurrence of words in the message, and is exactly how HashingTF works under the hood. The disadvantage is that the mapping from words to indices is not necessarily unique: truncating hash codes by the length of the vector will map different strings to the same index. This is known as a collision. The solution is to make n large enough that the frequency of collisions is minimized.

Tip

HashingTF is similar to building a hash table (for example, a Scala map) whose keys are words and whose values are the number of times that word occurs in the message, with one important difference: it does not attempt to deal with hash collisions. Thus, if two words map to the same hash, they will have the wrong frequency. There are two advantages to using this algorithm over just constructing a hash table:

  • We do not have to maintain a list of distinct words in memory.
  • Each e-mail can be transformed to a vector independently of all others: we do not have to reduce over different partitions to get the set of keys in the map. This greatly eases applying this algorithm to each e-mail in a distributed manner, since we can apply the HashingTF transformation on each partition independently.

The main disadvantage is that we must use machine learning algorithms that can take advantage of the sparse representation efficiently. This is the case with logistic regression, which we will use here.

As you might expect, the HashingTF transformer takes, as parameters, the input and output columns. It also takes a parameter defining the number of distinct hash buckets in the vector. Increasing the number of buckets decreases the number of collisions. In practice, a value between Transformers and Transformers is recommended.

scala> val hashingTF = (new HashingTF()
  .setInputCol("words")
  .setOutputCol("features")
  .setNumFeatures(1048576))
hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_3b78eca9595c

scala> val hashedDF = hashingTF.transform(tokenizedDF)
hashedDF: DataFrame = [fileName: string, text: string, category: string, words: array<string>, features: vector]

scala> hashedDF.select("features").show
+--------------------+
|            features|
+--------------------+
|(1048576,[0,33,36...|
|(1048576,[0,36,40...|
|(1048576,[0,33,34...|
|(1048576,[0,33,36...|
|(1048576,[0,33,34...|
|(1048576,[0,33,34...|
+--------------------+

Each element in the features column is a sparse vector:

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val firstRow = hashedDF.select("features").first
firstRow: org.apache.spark.sql.Row = ...

scala> val Row(v:Vector) = firstRow
v: Vector = (1048576,[0,33,36,37,...],[1.0,3.0,4.0,1.0,...])

We can thus interpret our vector as: the word that hashes to element 33 occurs three times, the word that hashes to element 36 occurs four times etc.

Estimators

We now have the features ready for logistic regression. The last step prior to running logistic regression is to create the target variable. We will transform the category column in our DataFrame to a binary 0/1 target column. Spark provides a StringIndexer class that replaces a set of strings in a column with doubles. A StringIndexer is not a transformer: it must first be 'fitted' to a set of categories to calculate the mapping from string to numeric value. This introduces the second class of components in the pipeline API: estimators.

Unlike a transformer, which works "out of the box", an estimator must be fitted to a DataFrame. For our string indexer, the fitting process involves obtaining the list of unique strings ("spam" and "ham") and mapping each of these to a double. The fitting process outputs a transformer which can be used on subsequent DataFrames.

scala> val indexer = (new StringIndexer()
  .setInputCol("category")
  .setOutputCol("label"))
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_16db03fd0546

scala> val indexTransform = indexer.fit(trainDF)
indexTransform: StringIndexerModel = strIdx_16db03fd0546

The transformer produced by the fitting process has a labels attribute describing the mapping it applies:

scala> indexTransform.labels
Array[String] = Array(ham, spam)

Each label will get mapped to its index in the array: thus, our transformer maps ham to 0 and spam to 1:

scala> val labelledDF = indexTransform.transform(hashedDF)
labelledDF: org.apache.spark.sql.DataFrame = [fileName: string, text: string, category: string, words: array<string>, features: vector, label: double]

scala> labelledDF.select("category", "label").distinct.show
+--------+-----+
|category|label|
+--------+-----+
|     ham|  0.0|
|    spam|  1.0|
+--------+-----+

We now have the feature vectors and classification labels in the correct format for logistic regression. The component for performing logistic regression is an estimator: it is fitted to a training DataFrame to create a trained model. The model can then be used to transform test DataFrames.

scala> import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegression

scala> val classifier = new LogisticRegression().setMaxIter(50)
classifier: LogisticRegression = logreg_a5e921e7c1a1 

The LogisticRegression estimator expects the feature column to be named "features" and the label column (the target) to be named "label", by default. There is no need to set these explicitly, since they match the column names set by hashingTF and indexer. There are several parameters that can be set to control how logistic regression works:

scala> println(classifier.explainParams)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100, current: 50)
regParam: regularization parameter (>= 0) (default: 0.0)
threshold: threshold in binary classification prediction, in range [0, 1] (default: 0.5)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-6)
...

For now, we just set the maxIter parameter. We will look at the effect of other parameters, such as regularization, later on. Let's now fit the classifier to labelledDF:

scala> val trainedClassifier = classifier.fit(labelledDF)
trainedClassifier: LogisticRegressionModel = logreg_353d18f6a5f0

This produces a transformer that we can use on a DataFrame with a features column. The transformer appends a prediction column and a probability column. We can, for instance use trainedClassifier to transform labelledDF, the training set itself:

scala> val labelledDFWithPredictions = trainedClassifier.transform(
  labelledDF)
labelledDFWithPredictions: DataFrame = [fileName: string, ...

scala> labelledDFWithPredictions.select($"label", $"prediction").show
+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
+-----+----------+

A quick way of checking the performance of our model is to just count the number of misclassified messages:

scala> labelledDFWithPredictions.filter { 
  $"label" !== $"prediction" 
}.count
Long = 1

In this case, logistic regression managed to correctly classify every message but one in the training set. This is perhaps unsurprising, given the large number of features and the relatively clear demarcation between the words used in spam and legitimate e-mails.

Of course, the real test of a model is not how well it performs on the training set, but how well it performs on a test set. To test this, we could just push the test DataFrame through the same stages that we used to train the model, replacing estimators with the fitted transformer that they produced. MLlib provides the pipeline abstraction to facilitate this: we wrap an ordered list of transformers and estimators in a pipeline. This pipeline is then fitted to a DataFrame corresponding to the training set. The fitting produces a PipelineModel instance, equivalent to the pipeline but with estimators replaced by transformers, as shown in this diagram:

Estimators

Let's construct the pipeline for our logistic regression spam filter:

scala> import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.Pipeline

scala> val pipeline = new Pipeline().setStages(
  Array(indexer, tokenizer, hashingTF, classifier)
)
pipeline: Pipeline = pipeline_7488113e284d

Once the pipeline is defined, we fit it to the DataFrame holding the training set:

scala> val fittedPipeline = pipeline.fit(trainDF)
fittedPipeline: org.apache.spark.ml.PipelineModel = pipeline_089525c6f100

When fitting a pipeline to a DataFrame, estimators and transformers are treated differently:

  • Transformers are applied to the DataFrame and copied, as is, into the pipeline model.
  • Estimators are fitted to the DataFrame, producing a transformer. The transformer is then applied to the DataFrame, and appended to the pipeline model.

We can now apply the pipeline model to the test set:

scala> val testDFWithPredictions = fittedPipeline.transform(testDF)
testDFWithPredictions: DataFrame = [fileName: string, ...

This has added a prediction column to the DataFrame with the predictions of our logistic regression model. To measure the performance of our algorithm, we calculate the classification error on the test set:

scala> testDFWithPredictions.filter { 
  $"label" !== $"prediction" 
}.count
Long = 20

Thus, our naive logistic regression algorithm, with no model selection, or regularization, mis-classifies 2.3% of e-mails. You may, of course, get slightly different results, since the train-test split was random.

Let's save the training and test DataFrames, with predictions, as parquet files:

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> (labelledDFWithPredictions
  .select("fileName", "label", "prediction", "probability")
  .write.mode(SaveMode.Overwrite)
  .parquet("transformedTrain.parquet"))

scala> (testDFWithPredictions
  .select("fileName", "label", "prediction", "probability")
  .write.mode(SaveMode.Overwrite)
  .parquet("transformedTest.parquet"))

Tip

In spam classification, a false positive is considerably worse than a false negative: it is much worse to classify a legitimate message as spam, than it is to let a spam message through. To account for this, we could increase the threshold for classification: only messages that score, for instance, 0.7 or above would get classified as spam. This raises the obvious question of choosing the right threshold. One way to do this would be to investigate the false positive rate incurred in the test set for different thresholds, and choosing the lowest threshold to give us an acceptable false positive rate. A good way of visualizing this is to use ROC curves, which we will investigate in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.245.1