Binary classification using LogisticRegression with Pipeline API

Earlier, with the spam example on binary classification, we saw how we prepared the data, separated it into training and test data, trained the model, and evaluated it against test data before we finally arrived at the metrics. This series of steps can be abstracted in a simplified manner using Spark's Pipeline API.

In this recipe, we'll take a look at how to use the Pipeline API to solve the same classification problem. Imagine the pipeline to be a factory assembly line where things happen one after another. In our case, we'll pass our raw unprocessed data through various processors before we finally feed the data into the classifier.

How to do it...

In this recipe, we'll classify the same spam/ham dataset (https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection) first using the plain Pipeline, and then using a cross-validator to select the best model for us given a grid of parameters.

Let's summarize the steps:

  1. Importing and splitting data as test and training sets.
  2. Constructing the participants of the Pipeline.
  3. Preparing a pipeline and training a model.
  4. Predicting against test data.
  5. Evaluating the model without cross-validation.
  6. Constructing parameters for cross-validation.
  7. Constructing a cross-validator and fitting the best model.
  8. Evaluating a model with cross-validation.

Importing and splitting data as test and training sets

This process is a little different from the previous recipe, in the sense that we don't construct LabeledPoint now. Instead of an RDD of LabeledPoint, the pipeline requires a DataFrame. So, we convert each line of text into a Document object (with the label and the content) and then convert RDD[Document] into a DataFrame by calling the toDF() function on the RDD:

case class Document(label: Double, content: String)

  val docs = sc.textFile("SMSSpamCollection").map(line => {
    val words = line.split("	")
    val label=if (words.head.trim()=="spam") 1.0 else 0.0
    Document(label, words.tail.mkString(" "))
  })

  //Split dataset
  val spamPoints = docs.filter(doc => doc.label==1.0).randomSplit(Array(0.8, 0.2))
  val hamPoints = docs.filter(doc => doc.label==0.0).randomSplit(Array(0.8, 0.2))

  println("Spam count:" + (spamPoints(0).count) + "::" + (spamPoints(1).count))
  println("Ham count:" + (hamPoints(0).count) + "::" + (hamPoints(1).count))

  val trainingSpamSplit = spamPoints(0)
  val testSpamSplit = spamPoints(1)

  val trainingHamSplit = hamPoints(0)
  val testHamSplit = hamPoints(1)

  val trainingSplit = trainingSpamSplit ++ trainingHamSplit
  val testSplit = testSpamSplit ++ testHamSplit

  import sqlContext.implicits._
  val trainingDFrame=trainingSplit.toDF()
  val testDFrame=testSplit.toDF()

Construct the participants of the Pipeline

In order to arrange the pipeline, we need to construct its participants. There are three unique participants (or pipeline stages) of this pipeline, and we have to line them up in the right order:

  • Tokenizer: This disintegrates the sentence into tokens
  • HashingTF: This creates a term frequency vector from the terms
  • IDF: This creates an inverse document frequency vector from the terms
  • VectorAssembler: This combines the TF-IDF vector and the label vector to form a single vector, which will form the input features for the classification algorithm
  • LogisticRegression: This is the classification algorithm itself

Let's construct these first:

val tokenizer=new Tokenizer().setInputCol("content").setOutputCol("tokens")
  val hashingTf=new HashingTF().setInputCol(tokenizer.getOutputCol).setOutputCol("tf")
  val idf = new IDF().setInputCol(hashingTf.getOutputCol).setOutputCol("tfidf")
  val assembler = new VectorAssembler().setInputCols(Array("tfidf", "label")).setOutputCol("features")

val logisticRegression=new LogisticRegression().setFeaturesCol("features").setLabelCol("label").setMaxIter(10)

When RDD[Document] is run against the first pipeline stage, that is, Tokenizer, the "content" field of the Document is taken as the input column, and the output of the tokenizer is a bag of words that is captured in the "tokens" output column. HashingTF takes the "tokens" and converts them into a TF vector. Notice that the input column of HashingTF is the same as the output column from the previous stage. IDF takes the tf vector and returns a tf-idf vector. VectorAssembler merges the tf-idf vector and the label to form a single vector. This will be used as an input to the classification algorithm. Finally, for the LogisticRegression stage, we specify the features column and the label column. However, if the input DataFrame has a column named "label" with a Double type and "features" of type Vector, there is no need to explicitly mention that. So, in our case, since we have "label" as an attribute of the Document case class and the output column of the HashingTF is named "features", there is no need for us to specify them explicitly. The following code would work just fine:

val logisticRegression=new LogisticRegression().setMaxIter(10)

Internally, this implementation of LogisticRegression constructs LabeledPoints for each instance of the data, and uses some advanced optimization algorithms to derive a model from the training data.

At every stage, each of these transformations occurs against the input DataFrame of that particular stage, and the transformed DataFrame gets passed along until the final stage.

Preparing a pipeline and training a model

As the next step, we just need to form a pipeline out of the various pipeline stages that we constructed in the previous step. We then train a model by calling the pipeline.fit function:

val pipeline=new Pipeline()

pipeline.setStages(Array(tokenizer, hashingTf, logisticRegression))
val model=pipeline.fit(trainingDFrame)

Note

If you are getting java.lang.IllegalArgumentException: requirement failed: Column label must be of type DoubleType but was actually StringType, it just means that your label isn't of the Double type.

Predicting against test data

Using the newly constructed model to predict the data is just a matter of calling the transform method of the model. Then, we also extract the actual label and the predicted value to calculate the metrics:

  val predictsAndActualsNoCV:RDD[(Double,Double)]=model.transform(testDFrame).map(r => (r.getAs[Double]("label"), r.getAs[Double]("prediction"))).cache

Evaluating a model without cross-validation

Cross-validation is a multiple-iteration model validation technique in which our training and test sets are split into different partitions. The entire dataset is split into subsets, and for each iteration, analysis is done on one subset and validation on a different subset. For this recipe, we'll run the algorithm first without cross-validation, and then with cross-validation.

Firstly, we'll use the same validation metric and method that we used in the previous recipe. We will simply calculate the area under the ROC curve, the precision, and the confusion matrix:

def calculateMetrics(predictsAndActuals: RDD[(Double, Double)], algorithm: String) {

    val accuracy = 1.0 * predictsAndActuals.filter(predActs => predActs._1 == predActs._2).count() / predictsAndActuals.count()
    val binMetrics = new BinaryClassificationMetrics(predictsAndActuals)
    println(s"************** Printing metrics for $algorithm ***************")
    println(s"Area under ROC ${binMetrics.areaUnderROC}")
    println(s"Accuracy $accuracy")

    val metrics = new MulticlassMetrics(predictsAndActuals)
    println(s"Precision : ${metrics.precision}")
    println(s"Confusion Matrix 
${metrics.confusionMatrix}")
    println(s"************** ending metrics for $algorithm *****************")
  }

A sample output of this pipeline without cross-validation is as follows:

************** Printing metrics for Without Cross validation ***************
Area under ROC 0.9676924738149228
Accuracy 0.9656357388316151
Confusion Matrix
993.0  36.0
4.0    131.0
************** ending metrics for Without Cross validation *****************

Constructing parameters for cross-validation

Before we use the cross-validator to choose the best model that fits the data, we would want to provide each of the parameters a set of alternate values that the validator can choose from.

The way we provide alternate values is in the form of a parameter grid:

val paramGrid=new ParamGridBuilder()
    .addGrid(hashingTf.numFeatures, Array(1000, 5000, 10000))
    .addGrid(logisticRegression.regParam, Array(1, 0.1, 0.03, 0.01))
    .build()

So, we say that the number of term frequency vectors that we want HashingTF to generate could be one of 1,000, 5,000, and 10,000, and the regularization parameter for logistic regression could be one of 1, 0.1, 0.03, and 0.01. Thus, in essence, we are passing a 3 x 4 matrix as the parameter grid.

Constructing cross-validator and fit the best model

Next, we construct a cross-validator and pass in the following parameters:

  • The parameter grid that we constructed in the previous step.
  • The pipeline that we constructed in step 3.
  • An evaluator for the cross-validator to decide which model is better.
  • The number of folds. Say, if we set the number of folds to 10, the training data would be split into 10 blocks. For each iteration (10 iterations), the first block would be selected as the cross-validation set, and the other nine would be the training sets:
    val crossValidator=new CrossValidator()
        .setEstimator(pipeline)
        .setEvaluator(new BinaryClassificationEvaluator())
        .setEstimatorParamMaps(paramGrid)
        .setNumFolds(10)
    

We finally let the cross-validator run against the training dataset and derive the best model out of it. Contrast the following line with pipeline.fit, where we skipped cross-validation:

val bestModel=crossValidator.fit(trainingDFrame)

Evaluating the model with cross-validation

Now, let's evaluate the model that is generated against the actual test data set (rather than the test dataset that the cross-validator uses internally):

  val predictsAndActualsWithCV:RDD[(Double,Double)]=bestModel.transform(testDFrame).map(r => (r.getAs[Double]("label"), r.getAs[Double]("prediction"))).cache

 calculateMetrics(predictsAndActualsWithCV, "Cross validation")

A sample output of this pipeline with cross-validation is as follows:

************** Printing metrics for Cross validation ***************
Area under ROC 0.9968220338983051
Accuracy 0.994579945799458
Confusion Matrix
938.0  6.0
0.0    163.0
************** ending metrics for Cross validation *****************

As we can see, the area under ROC is far better for this model than for any of our previously generated models.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.46.92