Classifying data

The remaining part of our application is to start classifying data. As introduced earlier, the reason for using Twitter was to steal ground truth from external resources. We will train a Naive Bayes classification model using Twitter data while predicting categories of the GDELT URLs. The convenient side of using a Kappa architecture approach is that we do not have to worry much about exporting some common pieces of code across different applications or different environments. Even better, we do not have to export/import our model between a batch and a speed layer (both GDELT and Twitter, sharing the same Spark context, are part of the same physical layer). We could save our model to HDFS for auditing purposes, but we simply need to pass a reference to a Scala object between both classes.

Training a Naive Bayes model

We've already introduced both the concept of bootstrapping a Naive Bayes model using Stack Exchange datasets and the use of a Classifier object that builds LabeledPoints out of text content. We will create a ClassifierModel case class that wraps both a Naive Bayes model and its associated labels dictionary and exposes both a predict and a save method:

case class ClassifierModel(
  model: NaiveBayesModel,
  labels: Map[String, Double]
) {
 
   def predictProbabilities(vectors: RDD[Vector]) = {
     val sc = vectors.sparkContext
     val bLabels = sc.broadcast(labels.map(_.swap))
     model.predictProbabilities(vectors).map { vector =>
       bLabels.value
         .toSeq
         .sortBy(_._1)
         .map(_._2)
         .zip(vector.toArray)
         .toMap
     }
   }
 
   def save(sc: SparkContext, outputDir: String) = {
     model.save(sc, s"$outputDir/model")
     sc.parallelize(labels.toSeq)
       .saveAsObjectFile(s"$outputDir/labels")
   }
   
}

Because more than one hashtag could be necessary to fully describe an article content, we will predict instead a probability distribution using the predictProbabilities function. We convert our label identifier (as Double) to the original category (as String) using the label dictionary we saved alongside the model. Finally we can save, for auditing purposes only, both our model and the label dictionary into HDFS.

Tip

All MLlib models support both a save and a load function. Data will be persisted as ObjectFile in HDFS, and can be easily retrieved and deserialized. Using ML library, objects are saved into parquet format. One would need, however, to save additional pieces of information; such as in our example, the label dictionary used for training that model.

Thread safety

Our Classifier is a singleton object, and, as per the singleton pattern, should be thread safe. That means that parallel threads should not modify a same state using, for instance, a setter method. In our current architecture, only Twitter will be training and updating a new model every 15 minutes, models that will be only used by the GDELT service (no concurrent update). However, there are two important things to take into consideration:

  1. Firstly, our model has been trained using distinct labels (hashtags found in a 24h time window, extracted every 15 minutes). A new model will be trained against an updated dictionary. Both the model and the labels are tightly coupled, and therefore must be synchronized. In the unlikely event of GDELT pulling labels while Twitter is updating a model, our predictions will be inconsistent. We ensure thread safety by wrapping both labels and models within our same ClassifierModel case class.
  2. The second (although less critical) concern is that our process is parallel. That means that similar tasks will be executed simultaneously from different executors, on different chunks of data. At a point in time, we would need to ensure that all models are the same version on each executor, although predicting a particular chunk of data with a slightly less up-to-date model will still technically be valid (as long as the model and labels are synchronized). We illustrate this statement with the two following examples. The first one cannot guarantee consistency of models across executors:
val model = NaiveBayes.train(points)
vectors.map { vector =>
  model.predict(vector)
 }

The second example (used by default by Spark) broadcasts a model to all executors at once, hence guaranteeing the overall consistency of the predicting phase:

val model = NaiveBayes.train(points)
val bcModel = sc.broadcast(model)
vectors mapPartitions { it =>
  val model = bcModel.value
  it.map { vector =>
    model.predict(vector)
  }
}

In our Classifier singleton object, we define our model as a global variable (as optional as it may not exist yet) that will be updated after each call to the train method:

var model = None: Option[ClassifierModel]
 
def train(rdd: RDD[(String, Array[String])]): ClassifierModel = {
  val labeledPoints = buildLabeledPoints(rdd)
  val labels = getLabels(rdd)
  labeledPoints.cache()
  val nbModel = NaiveBayes.train(labeledPoints)
  labeledPoints.unpersist(blocking = false)
  val cModel = ClassifierModel(nbModel, labels)
  model = Some(cModel)
  cModel
}

Coming back to our Twitter stream, for each RDD, we build our training set (abstracted within our Classifier), train a new model, and then save it to HDFS:

def trainAndSave(trainingSet: RDD[(String, Array[String])],  modelOutputDir: String) = {
  Classifier
     .train(trainingSet)
     .save(batch.sparkContext, modelOutputDir)
}

Predict the GDELT data

Using the Classifier singleton object, we can access the latest model published from the Twitter processor. For each RDD, for each article, we simply predict the hashtags probability distribution that describes each article's text content:

gdeltContent.foreachRDD { batch =>
 
  val textRdd = batch.map(_.body.get)
  val predictions = Classifier.predictProbabilities(textRdd)

  batch.zip(predictions).map { case (content, dist) =>
    val hashTags = dist.filter { case (hashTag, proba) =>
      proba > 0.25d
    }
    .toSeq
    .map(_._1)
    (content, hashTags)
  }
  .map { case (content, hashTags) =>
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    Map(
      "time"  -> sdf.format(new Date()),
      "body"  -> content.body.get,
      "url"   -> content.url,
      "tags"  -> hashTags,
      "title" -> content.title
    )
  }
  .saveToEs("gzet/gdelt")

}

We only keep probabilities higher than 25% and publish each article together with its predicted hashtags into our Elasticsearch cluster. Publishing the results officially marks the end of our classification application. We report the full architecture here:

Predict the GDELT data
Figure 11: An innovative way of tagging news articles

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.31.163