The remaining part of our application is to start classifying data. As introduced earlier, the reason for using Twitter was to steal ground truth from external resources. We will train a Naive Bayes classification model using Twitter data while predicting categories of the GDELT URLs. The convenient side of using a Kappa architecture approach is that we do not have to worry much about exporting some common pieces of code across different applications or different environments. Even better, we do not have to export/import our model between a batch and a speed layer (both GDELT and Twitter, sharing the same Spark context, are part of the same physical layer). We could save our model to HDFS for auditing purposes, but we simply need to pass a reference to a Scala object between both classes.
We've already introduced both the concept of bootstrapping a Naive Bayes model using Stack Exchange datasets and the use of a Classifier
object that builds LabeledPoints
out of text content. We will create a ClassifierModel
case class that wraps both a Naive Bayes model and its associated labels dictionary and exposes both a predict
and a save
method:
case class ClassifierModel( model: NaiveBayesModel, labels: Map[String, Double] ) { def predictProbabilities(vectors: RDD[Vector]) = { val sc = vectors.sparkContext val bLabels = sc.broadcast(labels.map(_.swap)) model.predictProbabilities(vectors).map { vector => bLabels.value .toSeq .sortBy(_._1) .map(_._2) .zip(vector.toArray) .toMap } } def save(sc: SparkContext, outputDir: String) = { model.save(sc, s"$outputDir/model") sc.parallelize(labels.toSeq) .saveAsObjectFile(s"$outputDir/labels") } }
Because more than one hashtag could be necessary to fully describe an article content, we will predict instead a probability distribution using the predictProbabilities
function. We convert our label identifier (as Double
) to the original category (as String
) using the label dictionary we saved alongside the model. Finally we can save, for auditing purposes only, both our model and the label dictionary into HDFS.
All MLlib models support both a save and a load function. Data will be persisted as ObjectFile
in HDFS, and can be easily retrieved and deserialized. Using ML library, objects are saved into parquet format. One would need, however, to save additional pieces of information; such as in our example, the label dictionary used for training that model.
Our Classifier
is a singleton object, and, as per the singleton pattern, should be thread safe. That means that parallel threads should not modify a same state using, for instance, a setter method. In our current architecture, only Twitter will be training and updating a new model every 15 minutes, models that will be only used by the GDELT service (no concurrent update). However, there are two important things to take into consideration:
ClassifierModel
case class.val model = NaiveBayes.train(points) vectors.map { vector => model.predict(vector) }
The second example (used by default by Spark) broadcasts a model to all executors at once, hence guaranteeing the overall consistency of the predicting phase:
val model = NaiveBayes.train(points) val bcModel = sc.broadcast(model) vectors mapPartitions { it => val model = bcModel.value it.map { vector => model.predict(vector) } }
In our Classifier
singleton object, we define our model as a global variable (as optional as it may not exist yet) that will be updated after each call to the train
method:
var model = None: Option[ClassifierModel] def train(rdd: RDD[(String, Array[String])]): ClassifierModel = { val labeledPoints = buildLabeledPoints(rdd) val labels = getLabels(rdd) labeledPoints.cache() val nbModel = NaiveBayes.train(labeledPoints) labeledPoints.unpersist(blocking = false) val cModel = ClassifierModel(nbModel, labels) model = Some(cModel) cModel }
Coming back to our Twitter stream, for each RDD, we build our training set (abstracted within our Classifier
), train a new model, and then save it to HDFS:
def trainAndSave(trainingSet: RDD[(String, Array[String])], modelOutputDir: String) = { Classifier .train(trainingSet) .save(batch.sparkContext, modelOutputDir) }
Using the Classifier
singleton object, we can access the latest model published from the Twitter processor. For each RDD, for each article, we simply predict the hashtags probability distribution that describes each article's text content:
gdeltContent.foreachRDD { batch => val textRdd = batch.map(_.body.get) val predictions = Classifier.predictProbabilities(textRdd) batch.zip(predictions).map { case (content, dist) => val hashTags = dist.filter { case (hashTag, proba) => proba > 0.25d } .toSeq .map(_._1) (content, hashTags) } .map { case (content, hashTags) => val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") Map( "time" -> sdf.format(new Date()), "body" -> content.body.get, "url" -> content.url, "tags" -> hashTags, "title" -> content.title ) } .saveToEs("gzet/gdelt") }
We only keep probabilities higher than 25% and publish each article together with its predicted hashtags into our Elasticsearch cluster. Publishing the results officially marks the end of our classification application. We report the full architecture here:
3.144.31.163