Named entity recognition

Building a web scraper that enriches an input dataset containing URLs with external web-based HTML content is of great business value within a big data ingestion service. But while an average data scientist should be able to study the returned content by using some basic clustering and classification techniques, an expert data scientist will bring this data enrichment process to the next level, by further enriching and adding value to it in post processes. Commonly, these value-added, post processes include disambiguating the external text content, extracting entities (like People, Places, and Dates), and converting raw text into its simplest grammatical form. We will explain in this section how to leverage the Spark framework in order to create a reliable Natural Language Processing (NLP) pipeline that includes these valuable post-processed outputs, and which handles English language-based content at any scale.

Scala libraries

ScalaNLP (http://www.scalanlp.org/) is the parent project of breeze (among others), and is a numerical computational framework heavily used in Spark MLlib. This library would have been the perfect candidate for NLP on Spark if it was not causing such a number of dependency issues between the different versions of breeze and epic. To overcome these core dependency mismatches, we would have to recompile either the entire Spark distribution or the full ScalaNLP stack, neither of them being a walk in the park. Instead, our preferred candidate is thus a suite of Natural Language processors from the Computational Language Understanding Lab (https://github.com/clulab/processors). Written in Scala 2.11, it provides three different APIs: A Stanford CoreNLP processor, a fast processor, and one for processing biomedical text. Within this library, we can use FastNLPProcessor that is both accurate enough for basic Named Entity Recognition (NER) functionalities and licensed under Apache v2.

<dependency>
  <groupId>org.clulab</groupId>
  <artifactId>processors-corenlp_2.11</artifactId>
  <version>6.0.1</version>
</dependency>

<dependency>
  <groupId>org.clulab</groupId>
  <artifactId>processors-main_2.11</artifactId>
  <version>6.0.1</version>
</dependency>

<dependency>
  <groupId>org.clulab</groupId>
  <artifactId>processors-models_2.11</artifactId>
  <version>6.0.1</version>
</dependency>

NLP walkthrough

A NLP processor annotates a document and returns a list of lemma (words in their simplest grammatical form), a list of named entities types such as [ORGANIZATION], [LOCATION], [PERSON] and a list of normalized entities (such as actual date values).

Extracting entities

In the following example, we initialize a FastNLPProcessor object, annotate and tokenize the document into a list of Sentence, zip both the lemma and NER types, and finally return an array of recognized entities for each given sentence.

case class Entity(eType: String, eVal: String)

def processSentence(sentence: Sentence): List[Entity] = {
  val entities = sentence.lemmas.get
    .zip(sentence.entities.get)
    .map {
      case (eVal, eType) =>
        Entity(eType, eVal)
    }
}

def extractEntities(processor: Processor, corpus: String) = {
  val doc = processor.annotate(corpus)
  doc.sentences map processSentence
}

val t = "David Bowie was born in London"
val processor: Processor = new FastNLPProcessor()
val sentences = extractEntities(processor, t)

sentences foreach { sentence =>
  sentence foreach println
}
 
/*
Entity(David,PERSON)
Entity(Bowie,PERSON)
Entity(was,O)
Entity(born,O)
Entity(in,O) 
Entity(London,LOCATION) 
*/

Looking at the above output, you may notice that all the retrieved entities are not linked together, both David and Bowie being two distinct entities of a type [PERSON]. We recursively aggregate consecutive similar entities using the following methods.

def aggregate(entities: Array[Entity]) = {
  aggregateEntities(entities.head, entities.tail, List())
}

def aggregateEntity(e1: Entity, e2: Entity) = {
  Entity(e1.eType, e1.eVal + " " + e2.eVal)
}
 
def aggEntities(current: Entity, entities: Array[Entity], processed : List[Entity]): List[Entity] = {
  if(entities.isEmpty) {
    // End of recusion, no additional entity to process
    // Append our last un-processed entity to our list
    current :: processed
  } else {
    val entity = entities.head
    if(entity.eType == current.eType) {
      // Aggregate consecutive values only of a same entity type      val aggEntity = aggregateEntity(current, entity)
      // Process next record
      aggEntities(aggEntity, entities.tail, processed)
    } else {
      // Add current entity as a candidate for a next aggregation
      // Append our previous un-processed entity to our list      aggEntities(entity, entities.tail, current :: processed)
    }
  }
}
 
def processSentence(sentence: Sentence): List[Entity] = {
  val entities = sentence.lemmas.get
    .zip(sentence.entities.get)
    .map {
      case (eVal, eType) =>
        Entity(eType, eVal)
    }
  aggregate(entities)
}

Printing out the same content now gives us a much more consistent output.

/*
(PERSON,David Bowie)
(O,was born in)
(LOCATION,London) 
*/

Tip

In a functional programming context, try to limit the use of any mutable object (such as using var). As a rule of thumb, any mutable object can always be avoided using preceding recursive functions.

Abstracting methods

We appreciate that working on an array of sentences (sentences being themselves an array of entities) might sound quite blurry. By experience, this will be much more confusing when running at scale, when several flatMap functions will be required for a simple transformation on a RDD. We wrap the results into a class Entities and expose the following methods:

case class Entities(sentences: Array[List[(String, String)]])
 {
 
  def getSentences = sentences

  def getEntities(entity: String) = {
    sentences flatMap { sentence =>
      sentence
    } filter { case (entityType, entityValue) =>
      entityType == entity
    } map { case (entityType, entityValue) =>
      entityValue
    } toSeq
  }

Building a scalable code

We have now defined our NLP framework and abstracted most of the complex logic into a set of methods and convenient classes. The next step is to integrate this code within a Spark context and to start processing text content at scale. In order to write scalable code, one needs to take extra care addressing the following points:

  • Any use of a non-serializable class within a Spark job must be carefully declared inside of a closure in order to avoid a NotSerializableException being raised. Please refer to the Goose library serialization issues we have been discussing in the previous section.
  • Whenever we create a new instance of FastNLPProcessor (whenever we first hit its annotate method because of lazy defined), all the required models will be retrieved from classpath, deserialized, and loaded into memory. This process takes around 10 seconds to complete.
  • In addition to the instantiation process being quite slow, it is worth mentioning that the models can be very large (around a gigabyte), and that keeping all these models in memory will be incrementally consuming our available Heap space.

Build once, read many

For all these reasons, embedding our code as-is within a map function would be terribly inefficient (and would probably blow all our available heap space). As per the below example, we leverage the mapPartitions pattern in order to optimize both the overhead time of loading and deserializing the models, as well as reducing the amount of memory used by our executors. Using mapPartitions forces the processing of the first record of each partition to evaluate the models inducing the model loading and deserializing process, and all subsequent calls on that executor will reuse those models within that partition, helping to limit the expensive model transfer and initialization costs to once per executor.

def extract(corpusRdd: RDD[String]): RDD[Entities] = {
  corpusRdd mapPartitions {
    case it=>
      val processor = new FastNLPProcessor()
      it map {
        corpus =>
          val entities = extractEntities(processor, corpus)
          new Entities(entities)
      }
    }
  }

The ultimate goal of this NLP scalability problem is to load the least possible number of models while processing as many records as possible. With one executor, we would load the models only once but would totally lose the point of parallel computing. With lots of executors, we will spend much more time deserializing models than actually processing our text content. This is discussed in the performance tuning section.

Scalability is also a state of mind

Because we designed our code locally before integrating it into Spark, we kept in mind writing things in the most convenient way. It is important because scalability is not only how fast you code works in a big data environment, but also how people feel about it, and how efficiently developers interact with your API. As a developer, if you need to chain nested flatMap functions in order to perform what should be a simple transformation, your code simply does not scale! Thanks to our data structure being totally abstracted inside of an Entities class, deriving the different RDDs from our NLP extraction can be done from a simple map function.

val entityRdd: RDD[Entities] = extract(corpusRdd)
entityRdd.persist(StorageLevel.DISK_ONLY)
entityRdd.count()
 
val perRdd = entityRdd.map(_.getEntities("PERSON"))
val locRdd = entityRdd.map(_.getEntities("LOCATION"))
val orgRdd = entityRdd.map(_.getEntities("ORGANIZATION"))

Tip

It is key to note the use of persist here. As previously done on the HTML fetcher process, we thoroughly cache the returned RDD to avoid situations where all its underlying transformations will be re-evaluated on any further action we might be calling. NLP processing being quite an expensive process, you have to make sure it won't be executed twice, hence the DISK_ONLY cache here.

Performance tuning

In order to bring this application to scale, you need to ask yourself the same key questions: Is this job I/O, memory, CPU, or network bound? NLP extraction is an expensive task, and loading a model is memory intensive. We may have to reduce the number of executors while allocating much more memory to each of them. To reflect these settings, we need to make sure our dataset will be evenly partitioned using at least as many partitions as the number of executors. We also need to enforce this repartitioning by caching our RDD and calling a simple count action that will evaluate all our previous transformations (including the partitioning itself).

val corpusRdd: RDD[String] = inputRdd.repartition(120)
corpusRdd.cache()
corpusRdd.count()

val entityRdd: RDD[Entities] = extract(corpusRdd)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.165.115