Chapter 10. Story De-duplication and Mutation

How large is the World Wide Web? Although it is impossible to know the exact size - not to mention the Deep and Dark Web - it was estimated to hold more than a trillion pages in 2008, that, in the data era, was somehow the middle age. Almost a decade later, it is safe to assume that the Internet's collective brain has more neurons than our actual gray matter that's stuffed between our ears. But out of these trillion plus URLs, how many web pages are truly identical, similar, or covering the same topic?

In this chapter, we will de-duplicate and index the GDELT database into stories. Then, we will track stories over time and understand the links between them, how they may mutate and if they could lead to any subsequent event in the near future.

We will cover the following topics:

  • Understand the concept of Simhash to detect near duplicates
  • Build an online de-duplication API
  • Build vectors using TF-IDF and reduce dimensionality using Random Indexing
  • Build stories connection in pseudo real-time using Streaming KMeans

Detecting near duplicates

While this chapter is about grouping articles into stories, this first section is all about detecting near duplicates. Before delving into the de-duplication algorithm itself, it is worth introducing the notion of story and de-duplication in the context of news articles. Given two distinct articles - by distinct we mean two different URLs - we may observe the following scenarios:

  • The URL of article 1 actually redirects to article 2 or is an extension of the URL provided in article 2 (some additional URL parameters, for instance, or a shortened URL). Both articles with the same content are considered as true duplicates although their URLs are different.
  • Both article 1 and article 2 are covering the exact same event, but could have been written by two different publishers. They share lots of content in common, but are not truly similar. Based on certain rules explained hereafter, they might be considered as near-duplicates.
  • Both article 1 and article 2 are covering the same type of event. We observe major differences in style or different flavors of the same topic. They could be grouped into a common story.
  • Both article 1 and article 2 are covering two different events. Both contents are different and should not be grouped within the same story, nor should they be considered as near duplicates.

Facebook users must have noticed the related articles feature. When you like a news article-click on an article's link or play an article's video, Facebook thinks this link is interesting and updates its timeline (or whatever it is called) to display more content that looks similar. In Figure 1, I was really amused to see the Samsung Galaxy Note 7 smartphone emitting smoke or catching fire, therefore banned from most of US flights. Facebook automatically suggested me similar articles around this Samsung fiasco. What probably happened is that by opening this link I may have queried the Facebook internal APIs and asked for similar content. Here comes the notion of finding near duplicates in real-time, and here is what we will try to build in the first section.

Detecting near duplicates
Figure 1: Facebook suggesting related articles

First steps with hashing

Finding true duplicates is easy. Two articles will be considered as identical if their content is the same. But instead of comparing strings (that may be large and therefore not efficient), we can compare their hash values just like one would compare hand signatures; two articles having the same signature should be considered as identical. A simple groupBy function would detect the true duplicatess out of a string array as shown as follows:

Array("Hello Spark", "Hello Hadoop", "Hello Spark")
  .groupBy(a => Integer.toBinaryString(a.hashCode))
  .foreach(println)

11001100010111100111000111001111 List(Hello Spark, Hello Spark)
10101011110110000110101101110011 List(Hello Hadoop)

But even the most complex hash function leads to some collisions. Java's built-in hashCode function is encoding a string into a 32-bit integer, which means that, in theory, we only have 232 possibilities of getting different words sharing the same hash value. In practice, collisions should always be handled carefully, as, according to the birthday paradox, they will appear much more often than once every 232 values. To prove our point, the following example considers the four different strings to be identical:

Array("AaAa", "BBBB", "AaBB", "BBAa")
  .groupBy(a => Integer.toBinaryString(a.hashCode))
  .foreach(Sprintln)

11111000000001000000 List(AaAa, BBBB, AaBB, BBAa)

Also, some articles may sometimes only differ by a very small portion of text, for example, a piece of advertisement, an additional footer, or an extra bit in the HTML code that make a hash signature different from almost identical content. In fact, even a minor typo on one single word would result in a total different hash value, making two near-duplicate articles to be considered as totally different.

Array("Hello, Spark", "Hello Spark")
  .groupBy(a => Integer.toBinaryString(a.hashCode))
  .foreach(println)

11100001101000010101000011010111  List(Hello, Spark)
11001100010111100111000111001111  List(Hello Spark)

Although the strings Hello Spark and Hello, Spark are really close (they only differ by 1 character), their hash values differ by 16-bits (out of 32). Luckily, the elders of the Internet may have found a solution to detect near duplicates using hash values.

Standing on the shoulders of the Internet giants

Needless to say, Google is fairly good at indexing webpages. With more than a trillion distinct URLs, detecting duplicates is the key when it comes to indexing web content. Surely the Internet giants must have developed techniques over the years to solve this problem of scale, hence limiting the amount of computing resources needed to index the whole Internet. One of these techniques described here is called Simhash and is so simple and neat, albeit so efficient, that it is worth knowing if you truly want to Master Spark for data science.

Note

More information about Simhash could be found at http://www.wwwconference.org/www2007/papers/paper215.pdf.

Simhashing

The main idea behind Simhash is not to compute one single hash value at once, but rather to look at the article's content and compute multiple individual hashes. For each word, each pair of word, or even each two-character shingle, we can easily compute hash values using the simple Java built-in hashCode function described earlier. In the following Figure 2, we report all the 32-bit hash values (the first 20 zero values omitted) of the two characters set included in the string hello simhash:

Simhashing
Figure 2: Building hello simhash shingles

A simple Scala implementation is reported next:

def shingles(content: String) = {
  content.replaceAll("\s+", "")
    .sliding(2)
    .map(s => s.mkString(""))
    .map(s => (s, s.hashCode)) 
}
 
implicit class BitOperations(i1: Int) {
  def toHashString: String = {
    String.format(
      "%32s",
      Integer.toBinaryString(i1)
    ).replace(" ", "0")
  }
}

shingles("spark").foreach { case (shingle, hash) =>
  println("[" + shingle + "]	" + hash.toHashString)
}

[sp]  00000000000000000000111001011101
[pa]  00000000000000000000110111110001
[ar]  00000000000000000000110000110001
[rk]  00000000000000000000111000111001

With all these hash values computed, we initialize a Simhash object as a zero integer. For each bit in that 32-bit integer, we count the number of hash values in our list with that particular bit set to 1 and subtract the number of values within that same list with that particular bit that is not set. This gives us the array reported in Figure 3. Finally, any value greater than 0 will be set to 1, any value lower or equal to 0 will be left as 0. The only tricky part here is to work on bit shifting operations, but the algorithm itself is fairly trivial. Note that we use recursion here to avoid the use of mutable variables (using var) or lists.

Simhashing
Figure 3: Building hello simhash

implicit class BitOperations(i1: Int) {
  
  // ../.. 

  def isBitSet(bit: Int): Boolean = {
    ((i1 >> bit) & 1) == 1
  }
}

implicit class Simhash(content: String) {
 
  def simhash = {
    val aggHash = shingles(content).flatMap{ hash =>
      Range(0, 32).map { bit =>
        (bit, if (hash.isBitSet(bit)) 1 else -1)
      }
    }
    .groupBy(_._1)
    .mapValues(_.map(_._2).sum > 0)
    .toArray

    buildSimhash(0, aggHash)
  }

 private def buildSimhash(
      simhash: Int,
      aggBit: Array[(Int, Boolean)]
     ): Int = {

    if(aggBit.isEmpty) return simhash
    val (bit, isSet) = aggBit.head
    val newSimhash = if(isSet) {
      simhash | (1 << bit)
    } else {
      simhash
    }
    buildSimhash(newSimhash, aggBit.tail)

  }
}

val s = "mastering spark for data science"
println(toHashString(s.simhash))
 
00000000000000000000110000110001

The hamming weight

It is easy to understand that the more words two articles have in common, the more both of them will share a same bit b set to 1 in their Simhash. But the beauty of Simhash comes with this aggregation step. Many other words in our corpus (hence other hashes) may not have this particular bit b set, hence making this value to also decrease when some different hashes are observed. Sharing a common set of words is not enough, similar articles must also share the same word frequency. The following example shows three Simhash values computed for the strings hello simhash, hello minhash, and hello world.

The hamming weight
Figure 4: Comparing hello simhash

When both hello simhash and hello world differ by 3-bits, hello simhash and hello minhash only differ by 1. In fact, we can express the distance between them as the hamming weight of their EXCLUSIVE OR (XOR) product. Hamming weight is the number of bits we need to change in order to turn a given number into the zero element. The hamming weight of the XOR operation of two numbers is therefore the number of bits that differ between these two elements, 1 in that case.

The hamming weight
Figure 5: Hamming weight of hello simhash

We simply use Java's bitCount function that returns the number of one-bits in the two's complement binary representation of the specified integer value.

implicit class BitOperations(i1: Int) {

  // ../..
 
  def distance(i2: Int) = {
    Integer.bitCount(i1 ^ i2) 
  }
}
 
val s1 = "hello simhash"
val s2 = "hello minhash"
val dist = s1.simhash.distance(s2.simhash)

We have been able to successfully build Simhash and perform some simple pairwise comparison. The next step is to scale this up and start detecting actual duplicates out of the GDELT database.

Detecting near duplicates in GDELT

We covered the data acquisition process in depth in Chapter 2, Data Acquisition. For this use case, we will use a NiFi flow in Figure 6 that listens to the GDELT master URL, fetches and unzips the latest GKG archive, and stores this file on HDFS in a compressed format.

Detecting near duplicates in GDELT
Figure 6: Downloading GKG data

We first parse our GKG records using the set of parsers we created earlier (available in our GitHub repo), extract all the distinct URLs and fetch the HTML content using the Goose extractor introduced in Chapter 6 ,Scraping Link-Based External Data.

val gdeltInputDir = args.head
val gkgRDD = sc.textFile(gdeltInputDir)
  .map(GKGParser.toJsonGKGV2)
  .map(GKGParser.toCaseClass2)
 
val urlRDD = gkgRDD.map(g => g.documentId.getOrElse("NA"))
  .filter(url => Try(new URL(url)).isSuccess)
  .distinct()
  .repartition(partitions)

val contentRDD = urlRDD mapPartitions { it =>
  val html = new HtmlFetcher()
  it map html.fetch
}

Because the hashcode function is case sensitive (Spark and spark result in total different hash values), it is strongly recommended to clean our text prior to a simhash function. Similar to what was described in Chapter 9 , News Dictionary and Real-Time Tagging System, we first use the following Lucene analyzer to stem words:

<dependency>
  <groupId>org.apache.lucene</groupId>
  <artifactId>lucene-analyzers-common</artifactId>
  <version>4.10.1</version>
</dependency>

As you may have noticed earlier, we wrote our Simhash algorithm inside of an implicit class; we can apply our simhash function directly on a string using the following import statement. A bit of extra effort taken at an early stage of development always pays off.

import io.gzet.story.simhash.SimhashUtils._
val simhashRDD = corpusRDD.mapValues(_.simhash)

We now have an RDD of content (Content being a case class wrapping the article URL, title and body) together with its Simhash value and a unique identifier we may be using later. Let's first try to validate our algorithm and find our first duplicates. From now on, we only consider as duplicates the articles that have no more than 2-bits difference in their 32-bit Simhash values.

hamming match {
  case 0 => // identical articles - true-duplicate
  case 1 => // near-duplicate (mainly typo errors)
  case 2 => // near-duplicate (minor difference in style)
  case _ => // different articles
}

But here comes a scalability challenge: we certainly do not want to execute a Cartesian product to compare pairwise articles from our Simhash RDD. Instead, we want to leverage the MapReduce paradigm (using a groupByKey function) and only group articles that are duplicates. Our approach is following an expand-and-conquer pattern where we first expand our initial dataset, leverage the Spark shuffle and then solve our problem locally at the executor level. As we only need to deal with 1-bit difference (we will then apply the same logic for 2-bits), our strategy is to expand our RDD so that for each Simhash s, we output all the 31 other 1-bit combinations of s using the same 1-bit mask.

def oneBitMasks: Set[Int] = {
  (0 to 31).map(offset => 1 << offset).toSet
}

00000000000000000000000000000001
00000000000000000000000000000010
00000000000000000000000000000100
00000000000000000000000000001000
...

Taking a Simhash value s, we output the possible 1-bit combinations of s using a XOR between each preceding mask and the Simhash value s.

val s = 23423
oneBitMasks foreach { mask =>
  println((mask ^ s).toHashString)
}
 
00000000000000000101101101111111
00000000000000000101101101111110
00000000000000000101101101111101
00000000000000000101101101111011
...

Dealing with 2-bits is not that different, although a bit more aggressive in terms of scalability (we now have 496 possible combinations to output, meaning any combination of 2-bits out of 32).

def twoBitsMasks: Set[Int] = {
  val masks = oneBitMasks
  masks flatMap { e1 =>
    masks.filter( e2 => e1 != e2) map { e2 =>
      e1 | e2
    }
  }
}

00000000000000000000000000000011
00000000000000000000000000000101
00000000000000000000000000000110
00000000000000000000000000001001
...

Finally, we build our set of masks to apply (note that we also want to output the original Simhash by applying a 0-bit difference mask) in order to detect duplicates as follows:

val searchmasks = twoBitsMasks ++ oneBitMasks ++ Set(0) 

This also helps us expand our initial RDD accordingly. This surely is an expensive operation as it increases the size of our RDD by a constant factor (496 + 32 + 1 possible combinations), but stays linear in terms of time complexity while Cartesian join is a quadratic operation - O(n2).

val duplicateTupleRDD = simhashRDD.flatMap {
  case ((id, _), simhash) =>
    searchmasks.map { mask =>
      (simhash ^ mask, id)
    }
}
.groupByKey()

We find that article A is a duplicate of article B, which is a duplicate of article C. This is a simple graph problem that can easily be solved through GraphX using a connected components algorithm.

val edgeRDD = duplicateTupleRDD
  .values
  .flatMap { it =>
    val list = it.toList
    for (x <- list; y <- list) yield (x, y)
  }
  .filter { case (x, y) =>
    x != y
  }
  .distinct()
  .map {case (x, y) =>
    Edge(x, y, 0)
  }
 
val duplicateRDD = Graph.fromEdges(edgeRDD, 0L)
  .connectedComponents()
  .vertices
  .join(simhashRDD.keys)
  .values

Out of the 15,000 articles used for that test, we extracted around 3,000 different stories. We report an example in Figure 7, which includes two near-duplicate articles we were able to detect, both of them being highly similar but not truly identical.

Detecting near duplicates in GDELT

Detecting near duplicates in GDELT
Figure 7: Galaxy Note 7 fiasco from the GDELT database

Indexing the GDELT database

The next step is to start building our online API so that any user can detect near-duplicate events in real time just like Facebook does on a user's timeline. We use the Play Framework here, but we will keep the description short as this has already been covered in Chapter 8, Building a Recommendation System.

Persisting our RDDs

Firstly, we need to extract data out of our RDD and persist it somewhere that is reliable, scalable, and highly efficient for search by key. As the main purpose of that database is to retrieve an article given a specific key (key being the Simhash), Cassandra (maven dependency as follows) sounds like a good fit for that job.

<dependency>
  <groupId>com.datastax.spark</groupId>
  <artifactId>spark-cassandra-connector_2.11</artifactId>
</dependency>

Our data model is fairly simple and consists of one simple table:

CREATE TABLE gzet.articles (
  simhash int PRIMARY KEY,
  url text,
  title text,
  body text
);

The easiest way to store our RDD into Cassandra is to wrap our result in a case class object that matches our earlier table definition and calls the saveToCassandra function:

import com.datastax.spark.connector._

corpusRDD.map { case (content, simhash) =>
  Article(
    simhash,
    content.body,
    content.title,
    content.url
  )
}
.saveToCassandra(cassandraKeyspace, cassandraTable)

Building a REST API

The next step is to work on the API itself. We create a new maven module (packaged as play2) and import the following dependencies:

<packaging>play2</packaging>
 
<dependencies>
  <dependency>
    <groupId>com.typesafe.play</groupId>
    <artifactId>play_2.11</artifactId>
  </dependency>
  <dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
  </dependency>
</dependencies>

We first create a new data access layer, that, given an input Simhash, builds the list of all the possible 1-bit and 2-bit masks discussed earlier and pulls all the matching records from Cassandra:

class CassandraDao() {
 
  private val session = Cluster.builder()
                               .addContactPoint(cassandraHost)
                               .withPort(cassandraPort)
                               .build()
                               .connect()
 
  def findDuplicates(hash: Int): List[Article] = {
    searchmasks.map { mask =>
      val searchHash = mask ^ hash
      val stmt = s"SELECT simhash, url, title, body FROM gzet.articles WHERE simhash = $searchHash;"
      val results = session.execute(stmt).all()
      results.map { row =>
        Article(
           row.getInt("simhash"),
           row.getString("body"),
           row.getString("title"),
           row.getString("url")
        )
      }
      .head
    }
    .toList
  }
}

In our controller, given an input URL, we extract the HTML content, tokenize the text, build a Simhash value, and call our service layer to finally return our matching records in a JSON format.

object Simhash extends Controller {

  val dao = new CassandraDao()
  val goose = new HtmlFetcher()
 
  def detect = Action { implicit request =>
    val url = request.getQueryString("url").getOrElse("NA")
    val article = goose.fetch(url)
    val hash = Tokenizer.lucene(article.body).simhash
    val related = dao.findDuplicates(hash)
    Ok(
        Json.toJson(
          Duplicate(
            hash,
            article.body,
            article.title,
            url,
            related
          )
       )
    )
  }
}

The following play2 route will redirect any GET request to the detect method we saw earlier:

GET /simhash io.gzet.story.web.controllers.Simhash.detect 

Finally, our API can be started and exposed to the end users as follows:

curl -XGET 'localhost:9000/simhash?url= http://www.detroitnews.com/story/tech/2016/10/12/samsung-damage/91948802/'

{
  "simhash": 1822083259,
  "body": "Seoul, South Korea - The fiasco of Samsung's [...]
  "title": "Fiasco leaves Samsung's smartphone brand [...]",
  "url": "http://www.detroitnews.com/story/tech/2016/[...]",
  "related": [
    {
      "hash": 1821919419,
      "body": "SEOUL, South Korea - The fiasco of [...]
      "title": "Note 7 fiasco leaves Samsung's [...]",
      "url": "http://www.chron.com/business/technology/[...]"
    },
    {
      "hash": -325433157,
      "body": "The fiasco of Samsung's fire-prone [...]
      "title": "Samsung's Smartphone Brand [...]",
      "url": "http://www.toptechnews.com/[...]"
    }
  ]
}

Congratulations! You have now built an online API that can be used to detect near-duplicates such as the ones around Galaxy Note 7 fiasco; but how accurate our API is compared to the one from Facebook? This surely is accurate enough to start de-noising GDELT data by grouping highly similar events into stories.

Area of improvement

Although we are already satisfied with the overall quality of the results returned by our API, here we discuss a major improvement in the context of news articles. In fact, articles are not only made of different bag of words, but follow a clear structure where the order truly matters. In fact, the title is always a punch line, and the main content is well covered within the first few lines only. The rest of the article does matter, but may not be as important as the introduction. Given that assumption, we can slightly modify our Simhash algorithm to take the order into account by attributing a different weight to each word.

implicit class Simhash(content: String) {
 
  // ../..
   
  def weightedSimhash = {

    val features = shingles(content)
    val totalWords = features.length
    val aggHashWeight = features.zipWithIndex
      .map {case (hash, id) =>
        (hash, 1.0 - id / totalWords.toDouble)
      }
      .flatMap { case (hash, weight) =>
        Range(0, 32).map { bit =>
          (bit, if(hash.isBitSet(bit)) weight else -weight)
        }
      }
      .groupBy(_._1)
      .mapValues(_.map(_._2).sum > 0)
      .toArray

    buildSimhash(0, aggHashWeight)
  }
 
}

Instead of adding 1 or -1 any time the same bit value is set or not, we add the corresponding weight of that word according to its position in the article. Similar articles will have to share same words, same word frequency, but also a similar structure. In other words, we are much less permissive in any difference occurring in the first few lines of text than we are at the really bottom line of each article.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.28.9