Chapter 9.  News Dictionary and Real-Time Tagging System

While a hierarchical data warehouse stores data in files of folders, a typical Hadoop based system relies on a flat architecture to store your data. Without proper data governance or a clear understanding of what your data is all about, there is an undeniable chance of turning data lakes into swamps, where an interesting dataset such as GDELT would be nothing more than a folder containing a vast amount of unstructured text files. For that reason, data classification is probably one of the most widely used machine learning techniques in large scale organizations as it allows users to properly categorize and label their data, publish these categories as part of their metadata solutions, and therefore access specific information in the most efficient way. Without a proper tagging mechanism executed upfront, ideally at ingest, finding all news articles about a specific topic would require parsing the entire dataset looking for specific keywords. In this chapter, we will be describing an innovative way of labeling incoming GDELT data in a non-supervised way and in near real time using both Spark Streaming and the 1% Twitter firehose.

We will cover the following topics:

  • Bootstrapping a Naive Bayes classifier using Stack Exchange data
  • Lambda versus Kappa architecture for real-time streaming applications
  • Kafka and Twitter4J within a Spark Streaming application
  • Thread safety when deploying models
  • Using Elasticsearch as a caching layer

The mechanical Turk

Data classification is a supervised learning technique. This means that you can only predict the labels and categories you have learned from a training dataset. Because the latter has to be properly labeled, this becomes the main challenge which we will be addressing in this chapter.

Human intelligence tasks

None of our data, within the context of news articles, has been properly labeled upfront; there is strictly nothing we can learn out of it. Common sense for data scientists is to start labeling some input records manually, records that will serve as a training dataset. However, because the number of classes may be relatively large, at least in our case (hundreds of labels), the amount of data to label could be significant (thousands of articles) and would require tremendous effort. A first solution is to outsource this laborious task to a "Mechanical Turk", the term being used as reference to one of the most famous hoaxes in history where an automated chess player fooled most of the world leaders (https://en.wikipedia.org/wiki/The_Turk). This commonly describes a process that can be done by a machine, but in reality it is done by a hidden person, hence a Human Intelligence Task.

For the readers information, a Mechanical Turk initiative has been started at Amazon (https://www.mturk.com/mturk/welcome), where individuals can register to perform human intelligence tasks such as labeling input data or detecting sentiment of a text content. Crowdsourcing this task could be one viable solution assuming you can share this internal (and potentially confidential) dataset to a third party. An alternative solution described here is to bootstrap a classification model using a pre-existing labeled dataset.

Bootstrapping a classification model

A text classification algorithm usually learns from term frequencies vectors; a possible approach is to train a model using external resources with a similar context. For instance, one could classify unlabeled IT related content using categories learned from a full dump of the Stack Overflow website. Because Stack Exchange is not only reserved for IT professionals, one could find various datasets in many different contexts that would serve many purposes (https://archive.org/download/stackexchange).

Learning from Stack Exchange

We will demonstrate here how to bootstrap a simple Naive Bayes classification model using the home brewing related dataset from the Stack Exchange website:

$ wget https://archive.org/download/stackexchange/beer.stackexchange.com.7z
$ 7z e beer.stackexchange.com.7z

We create a few methods that pull both the body and labels from all XML documents, extract the clean text content out of the HTML encoded body (using the Goose scraper introduced in Chapter 6, Scraping Link-Based External Data) and finally convert our RDD of XML documents into a Spark DataFrame. The different methods are not reported here, but they can be found in our code repository. One needs to note that Goose scraper can be used offline by providing the HTML content (as a string) alongside a dummy URL.

We provide the reader with a convenient parse method that can be used for pre-processing any Post.xml data from the Stack Exchange website. This function is part of our StackBootstraping code available in our code repository:

import io.gzet.tagging.stackoverflow.StackBootstraping

val spark = SparkSession.builder()
  .appName("StackExchange")
  .getOrCreate()

val sc = spark.sparkContext
val rdd = sc.textFile("/path/to/posts.xml")
val brewing = StackBootstraping.parse(rdd)

brewing.show(5)

+--------------------+--------------------+
|                body|                tags|
+--------------------+--------------------+
|I was offered a b...|              [hops]|
|As far as we know...|           [history]|
|How is low/no alc...|           [brewing]|
|In general, what'...|[serving, tempera...|
|Currently I am st...| [pilsener, storage]|
+--------------------+--------------------+

Building text features

With our beer content properly labeled, the remaining process is to bootstrap the algorithm itself. For that purpose, we use a simple Naive Bayes classification algorithm that determines the conditional probability of a label given an item's features. We first collect all distinct labels, assign a unique identifier (as Double), and broadcast our label dictionary to the Spark executors:

val labelMap = brewing
  .select("tags")
  .withColumn("tag", explode(brewing("tags")))
  .select("tag")
  .distinct()
  .rdd
  .map(_.getString(0)).zipWithIndex()
  .mapValues(_.toDouble + 1.0d)
labelMap.take(5).foreach(println)

/*
(imperal-stout,1.0)
(malt,2.0)
(lent,3.0)
(production,4.0)
(local,5.0)
*/

Tip

As introduced earlier, make sure that large collections that are used inside a Spark transformation have been broadcast to all Spark executors. This reduces the cost associated to network transfer.

A LabeledPoint is composed of both a label (as Double) and features (as Vector). A common practice to build features out of text content is to build term frequency vectors, where each word across all documents corresponds to a specific dimension. With around hundreds of thousands of dimensions (the estimated number of words in English is 1,025,109), this highly dimensional space will be particularly inefficient for most machine learning algorithms. In fact, when Naive Bayes multiplies probabilities (lower than 1), there is a certain risk of reaching 0 due to machine precision issue (numerical underflow as described in Chapter 14, Scalable Algorithm). Data scientists overcome that constraint using the principle of dimensionality reduction, projecting a sparse vector into a denser space while preserving distance measures (the principle of dimensionality reduction will be covered in Chapter 10, Story De-duplication and Mutation). Although we can find many algorithms and techniques for that purpose, we will use the hashing utility provided by Spark.

With a vector size of n (default of 220), its transform method groups all words in n different buckets in respect to their hash values, and sums up the bucket frequencies to build denser vectors.

Prior to a dimensionality reduction, which can be an expensive operation, vector size can be greatly reduced by stemming and cleaning the text content. We use the Apache Lucene analyzer here:

<dependency>
   <groupId>org.apache.lucene</groupId>
   <artifactId>lucene-analyzers-common</artifactId>
   <version>4.10.1</version>
 </dependency>

We remove all punctuation and numbers and feed the plain text object to a Lucene analyzer, collecting each clean word as a CharTermAttribute:

def stem(rdd: RDD[(String, Array[String])]) = {
 
  val replacePunc = """\W""".r
  val replaceDigitOnly = """\s\d+\s""".r
 
  rdd mapPartitions { it =>

    val analyzer = new EnglishAnalyzer
    it map { case (body, tags) =>
      val content1 = replacePunc.replaceAllIn(body, " ")
      val content = replaceDigitOnly.replaceAllIn(content1, " ")
      val tReader = new StringReader(content)
      val tStream = analyzer.tokenStream("contents", tReader)
      val term = tStream.addAttribute(classOf[CharTermAttribute])
       tStream.reset()
      val terms = collection.mutable.MutableList[String]()
      while (tStream.incrementToken) {
        val clean = term.toString
        if (!clean.matches(".*\d.*") && clean.length > 3) {
           terms += clean
        }
      }
      tStream.close()
      (terms.toArray, tags)
     }

  }

With this approach, we transform the text [Mastering Spark for Data Science - V1] into [master spark data science], hence reducing the number of words (therefore dimensions) from our input vectors. Finally, we normalize our term frequency vector using the MLlib normalizer class:

val hashingTf = new HashingTF()
val normalizer = new Normalizer()

val labeledCorpus = stem(df map { row =>
  val body = row.getString(0)
  val tags = row.getAs[mutable.WrappedArray[String]](1)
  (body, tags)
})

val labeledPoints = labeledCorpus flatMap { case (corpus, tags) =>
  val vector = hashingTf.transform(corpus)
  val normVector = normalizer.transform(vector)
  tags map { tag =>
    val label = bLabelMap.value.getOrElse(tag, 0.0d)
    LabeledPoint(label, normVector)
  }
}

Tip

Hash functions can lead to dramatic overestimates due to collisions (two different words of complete different meanings could share a same hash value). We will be discussing the Random Indexing technique in Chapter 10, Story De-duplication and Mutation, in order to limit the number of collisions while preserving the distance measure.

Training a Naive Bayes model

We train a Naive Bayes algorithm as follows and test our classifier using a test dataset that we did not include in the training data points. We finally display the first five predictions in the following example. The labels on the left-hand side are the original labels from our test content; on the right-hand side are the results of the Naive Bayes classification. An ipa has been predicted as hangover, validating with certainty the accuracy of our classification algorithm:

labeledPoints.cache()
val model: NaiveBayesModel = NaiveBayes.train(labeledPoints)
labeledPoints.unpersist(blocking = false)

model
  .predict(testPoints)
  .map { prediction =>
     bLabelMap.value.map(_.swap).get(prediction).get
   }
  .zip(testLabels)
  .toDF("predicted","original")
  .show(5)

+---------+-----------+
| original|  predicted|
+---------+-----------+
|  brewing|    brewing|
|      ipa|   hangover|
| hangover|   hangover|
| drinking|   drinking|
| pilsener|   pilsener|
+---------+-----------+

For convenience, we abstract all these methods and expose the following ones within a Classifier object that will be used later:

def train(rdd: RDD[(String, Array[String])]): ClassifierModel
def predict(rdd: RDD[String]): RDD[String]

We have demonstrated how to export labeled data from external sources, how to build a term frequency vector, and how to train a simple Naive Bayes classification model. The high level workflow used here is represented in the following figure and is common for most classification use cases:

Training a Naive Bayes model
Figure 1: Classification workflow

The next step is to start classifying the original unlabeled data (assuming our content is still brewery related). This closes the introduction of Naive Bayes classification and how a bootstrapped model could steal ground truth from external resources. Both these techniques will be used in our classification system in the following section.

Laziness, impatience, and hubris

Here comes the second of our main challenges that we will be facing within our context of news articles. Assuming someone spent days manually labeling data, this would solve our classification problem for known categories at a particular point in time, and would probably only be valid when back-testing our data. Who knows what the news headline would be on tomorrow's newspaper; no one can define all the fine-grained labels and topics that will be covered in the near future (although broader categories can still be defined). This would require lots of effort to constantly re-evaluate, retrain and redeploy our model whenever a new trending topic arises. As a concrete example, no one was talking about the topic of Brexit a year ago; this topic is now heavily mentioned in news articles.

From our experience, data scientists should bear in mind a famous quote from Larry Wall, inventor of the Perl programming language:

"We will encourage you to develop the three great virtues of a programmer, laziness, impatience and hubris".

  • Laziness makes you go to great efforts to reduce overall energy expenditure
  • Impatience makes you write programs that don't just react to your needs but anticipates them
  • Hubris makes you write programs that people won't want to say bad things about

We want to avoid efforts related to both the preparation and the maintenance of a classification model (laziness) and to programmatically anticipate the arising of new topics (impatience), though this could sound like an ambitious task (but what is hubris if not an excessive pride in achieving the impossible?). Social networks are a fantastic place to steal ground truth from. In fact, when people tweet news articles, they unconsciously help us label our data. We do not need to pay for Mechanical Turks when we potentially have millions of users doing the job for us. In other terms, we crowdsource the labeling of GDELT data to Twitter users.

Any article mentioned on Twitter will help us build a term frequency vector while the associated hashtags will be used as proper labels. In the following example, adorable news about President Obama meeting Prince George wearing a bathrobe has been classified as [#Obama] and [#Prince] http://www.wfmynews2.com/entertainment/adorable-prince-george-misses-bedtime-meets-president-obama/149828772:

Laziness, impatience, and hubris
Figure 2: President Obama meets Prince George, #Obama, #Prince

In the following example, we pay tribute to all of music's great losses of 2016 by machine learning topics [#DavidBowie], [#Prince], [#GeorgeMichael], and [#LeonardCohen] within the same news article from The Guardian https://www.theguardian.com/music/2016/dec/29/death-stars-musics-greatest-losses-of-2016:

Laziness, impatience, and hubris
Figure 3: Music's great losses in 2016 - source

Using this approach, our algorithm will be constantly and automatically re-evaluated, learning from arising topics on its own, hence working in a non-supervised way (although being a supervised learning algorithm in the proper sense).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.146.37.250