Names de-duplication

As we were pulling entities from an NLP extraction process without any validation, the name we were able to retrieve may be written in many different ways. They can be written in different order, might contain middle names or initials, a salutation or a nobility title, nicknames, or even some typos and spelling mistakes. Although we do not aim to fully de-duplicate the content (such as learning that both Ziggy Stardust and David Bowie stand for the same person), we will be introducing two simple techniques used to de-duplicate a large amount of data at a minimal cost by combining the concept MapReduce paradigm and functional programming.

Functional programming with Scalaz

This section is all about enriching data as part of an ingestion pipeline. We are therefore less interested in building the most accurate system using advanced machine learning techniques, but rather the most scalable and efficient one. We want to keep a dictionary of alternative names for each record, to merge and update them really fast, with the least possible code, and at very large scale. We want these structures to behave like monoids, algebraic associative structures properly supported on Scalaz (https://github.com/scalaz/scalaz), a library used for doing pure functional programming:

<dependency>
  <groupId>org.scalaz</groupId>
  <artifactId>scalaz-core_2.11</artifactId>
  <version>7.2.0</version>
</dependency>

Our de-duplication strategy

We use a simple example below to justify the need of using Scalaz programming for building a scalable, deduplication pipeline made of multiple transformations. Using a RDD of person, personRDD, as a test dataset shown as follows:

personRDD.take(8).foreach(println)

/*
David Bowie
david bowie
david#Bowie
David Bowie
david bowie
David Bowie
David Bowie
Ziggy Stardust
*/

Here, we first count the number of occurrences for each entry. This is in fact a simple Wordcount algorithm, the 101 of MapReduce programming:

val wcRDD = personRDD
  .map(_ -> 1)
  .reduceByKey(_+_)

wcRDD.collect.foreach(println)
 
/*
(David Bowie, 4)
(david bowie, 2)
(david#Bowie, 1)
(Ziggy Stardust, 1)
*/

Here, we apply a first transformation, such as lowercase, and produce an updated report:

val lcRDD = wcRDD.map { case (p, tf) => 
  (p.lowerCase(), tf) 
} 
.reduceByKey(_+_) 
 
lcRDD.collect.foreach(println) 
 
/* 
(david bowie, 6) 
(david#bowie, 1) 
(ziggy stardust, 1) 
*/ 

Here, we then apply a second transformation that removes any special character:

val reRDD = lcRDD.map { case (p, tf) =>
  (p.replaceAll("[^a-z]", ""), tf)
}
.reduceByKey(_+_)

reRDD.collect.foreach(println)

/*
(david bowie, 7)
(ziggy stardust, 1)
*/

We now have reduced our list of six entries to only two, but since we've lost the original records across our transformations, we cannot build a dictionary in the form of [original value] -> [new value].

Using the mappend operator

Instead, using the Scalaz API, we initialize a names' frequency dictionary (as a Map, initialized to 1) upfront for each original record and merge these dictionaries using the mappend function (accessed through the |+| operator). The merge occurs after each transformation, within a reduceByKey function, taking the result of the transformation as a key and the term frequency map as a value:

import scalaz.Scalaz._
 
def initialize(rdd: RDD[String]) = {
  rdd.map(s => (s, Map(s -> 1)))
     .reduceByKey(_ |+| _)
}
 
def lcDedup(rdd: RDD[(String, Map[String, Int])]) = {
  rdd.map { case (name, tf) =>
    (name.toLowerCase(), tf)
  }
  .reduceByKey(_ |+| _)
}

def reDedup(rdd: RDD[(String, Map[String, Int])]) = {
  rdd.map { case (name, tf) =>
    (name.replaceAll("\W", ""), tf)
  }
  .reduceByKey(_ |+| _)
}

val wcTfRdd = initialize(personRDD)
val lcTfRdd = lcDedup(wcTfRdd)
val reTfRdd = reDedup(lcTfRdd)

reTfRdd.values.collect.foreach(println)

/*
Map(David Bowie -> 4, david bowie -> 2, david#Bowie -> 1)
Map(ziggy stardust -> 1)
*/

For each de-duplication entry, we find the most frequent item and build our dictionary RDD as follows:

val dicRDD = fuTfRdd.values.flatMap {
  alternatives =>
    val top = alternatives.toList.sortBy(_._2).last._1
    tf.filter(_._1 != top).map { case (alternative, tf) =>
      (alternative, top)
    }
}

dicRDD.collect.foreach(println)

/*
david bowie, David Bowie
david#Bowie, David Bowie
*/

In order to fully de-duplicate our person RDD, one needs to replace all david bowie and david#bowie occurrences with David Bowie. Now that we have explained the de-duplication strategy itself, let us dive deeply into the set of transformations.

Simple clean

The first deduplication transformation is obviously to clean names from all their fuzzy characters or extra spaces. We replace accents with their matching ASCII characters, handle camel case properly, and remove any stop words such as [mr, miss, sir]. Applying this function to the prime minister of Tonga, [Mr. Sialeʻataongo Tuʻivakanō], we return [siale ataongo tu ivakano], a much cleaner version of it, at least in the context of string deduplication. Executing the deduplication itself will be as simple as a few lines of code using both the MapReduce paradigm and the monoids concept introduced earlier:

def clean(name: String, stopWords: Set[String]) = {
 
  StringUtils.stripAccents(name)
    .split("\W+").map(_.trim).filter { case part =>
      !stopWords.contains(part.toLowerCase())
    }
    .mkString(" ")
    .split("(?<=[a-z])(?=[A-Z])")
    .filter(_.length >= 2)
    .mkString(" ")
    .toLowerCase()

}
 
def simpleDedup(rdd: RDD[(String, Map[String, Int])], stopWords: Set[String]) = {

  rdd.map { case (name, tf) =>
    (clean(name, stopWords), tf)
  }
  .reduceByKey(_ |+| _)
 
}

DoubleMetaphone

DoubleMetaphone is a useful algorithm that can index names by their English pronunciation. Although it does not produce an exact phonetic representation of a name, it creates a simple hash function that can be used to group names with similar phonemes.

Note

For more information about DoubleMetaphone algorithm, please refer to: Philips, L. (1990). Hanging on the Metaphone (Vol. 7). Computer Language.)

We turn to this algorithm for performance reasons, as finding potential typos and spelling mistakes in large dictionaries is usually an expensive operation; it often requires a candidate name to be compared with each of the others we are tracking. This type of comparison is challenging in a big data environment as it usually requires a Cartesian join which can generate excessively large intermediate datasets. The metaphone algorithm offers a greater, and much faster alternative.

Using the DoubleMetaphone class from the Apache commons package, we simply leverage the MapReduce paradigm by grouping names sharing a same pronunciation. [david bowie], [david bowi] and [davide bowie], for example, are all sharing the same code [TFT#P] and will all be grouped together. In the example below, we compute the double metaphone hash for each record and call a reduceByKey that merges and updates all our names' frequency maps:

def metaphone(name: String) = {
  val dm = new DoubleMetaphone()
  name.split("\s")
    .map(dm.doubleMetaphone)
    .mkString("#")
}
 
def metaphoneDedup(rdd: RDD[(String, Map[String, Int])]) = {
  rdd.map { case (name, tf) =>
    (metaphone(name), tf)
  }
  .reduceByKey(_ |+| _)
}

We can also greatly improve this simple technique by keeping a list of common English nicknames (bill, bob, will, beth, al, and so on) and their associated primary names, so that we can match across non-phonetic synonyms. We can do this by pre-processing our name RDD by replacing the hash codes for known nicknames with the hash codes of the associated primary names, and then we can run the same deduplication algorithm to resolve duplicates across both phonetic and synonym based matches. This will detect both spelling mistakes and alternative nicknames as follows:

persons.foreach(p => println(p + "	" + metaphoneAndNickNames(p))

/*
David Bowie  TFT#P
David Bowi   TFT#P
Dave Bowie   TFT#P
*/

Once again, we want to highlight the fact this algorithm (and the simple cleansing routine shown above) will not be as accurate as a proper, fuzzy string matching approach that would, for example, compute a Levenshtein distance between each possible pair of names. By sacrificing accuracy, we do however create a method that is highly scalable, and that finds most common spelling mistakes at a minimal cost, especially spelling mistakes made on silent consonants. Once all the alternative names have been grouped on the resulting hash codes, we can output the best alternative to the presented name as the most frequent name we return from our term frequency objects. This best alternate is applied through a join with the initial name RDD in order to replace any record with its preferred alternative (if any):

def getBestNameRdd(rdd: RDD[(String, Map[String, Int])]) = {
  rdd.flatMap { case (key, tf) =>
    val bestName = tf.toSeq.sortBy(_._2).last._1
    tf.keySet.map { altName =>
      (altName, bestName)
    } 
  }
}
 
val bestNameRdd = getBestNameRdd(nameTfRdd)

val dedupRdd = nameRdd
  .map(_ -> 1)
  .leftOuterJoin(bestNameRdd)
  .map { case (name, (dummy, optBest)) =>
    optBest.getOrElse(name)
  }
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.22.74.160