GIS lookup

In the previous section, we were covering an interesting use case, how to extract location entities from unstructured data. In this section, we will make our enrichment process even smarter by trying to retrieve the actual geographical coordinate information (such as latitude and longitude) based on the locations of entities we were able to identify. Given an input string London, can we detect the city of London - UK together with its relative latitude and longitude? We will be discussing how to build an efficient geo lookup system that does not rely on any external API and which can process location data of any scale by leveraging the Spark framework and the Reduce-Side-Join pattern. When building this lookup service, we will have to bear in mind many places around the world might be sharing the same name (there are around 50 different places called Manchester in the US alone), and that an input record may not use the official name of the place it would be referring to (the official name of commonly used Geneva/Switzerland is Geneva).

GeoNames dataset

GeoNames (http://www.geonames.org/) is a geographical database that covers all countries, contains over 10 million place names with geographic coordinates, and is available for download free of charge. In this example, we will be using the AllCountries.zip dataset (1.5 GB) together with admin1CodesASCII.txt reference data in order to turn our location strings into valuable location objects with geo coordinates. We will be keeping only the records related to continents, countries, states, districts, and cities together with major oceans, seas, rivers, lakes, and mountains, thus reducing by half, the entire dataset. Although the admin codes dataset easily fits in memory, the Geo names must be processed within an RDD and need to be converted into the following case classes:

case class GeoName(
  geoId: Long,
  name: String,
  altNames: Array[String],
  country: Option[String],
  adminCode: Option[String],
  featureClass: Char,
  featureCode: String,
  population: Long,
  timezone: Array[String],
  geoPoint: GeoPoint
)
 
case class GeoPoint(
  lat: Double,
  lon: Double
)

We will not be describing the process of parsing a flat file into a geoNameRDD here. The parser itself is quite straightforward, processing a tab delimited records file and converting each value as per the above case class definition. We expose the following static method instead:

val geoNameRdd: RDD[GeoName] = GeoNameLookup.load(
  sc,
  adminCodesPath,
  allCountriesPath
)

Building an efficient join

The main lookup strategy will rely on a join operation to be executed against both our Geo names and our input data. In order to maximize the chance of getting a location match, we will be expanding our initial data using a flatMap function over all the possible alternative names, hence drastically increasing the initial size of 5 million to approximately 20 million records. We also make sure to clean names from any accents, dashes, or fuzzy characters they might contain:

val geoAltNameRdd = geoNameRdd.flatMap {
  geoName =>
    altNames map { altName =>
      (clean(altName), geoName)
    }
} filter { case (altName, geoName) =>
  StringUtils.isNotEmpty(altName.length)
} distinct()
 
val inputNameRdd = inputRdd.map { name =>
  (clean(name), name)
} filter { case (cleanName, place) =>
  StringUtils.isNotEmpty(cleanName.length)
 }

And voila, the remaining process is a simple join  operation between both a cleaned input and a cleaned geoNameRDD. Finally, we can group all the matching places into a simple set of GeoName objects:

def geoLookup(
  inputNameRdd: RDD[(String, String)],
  geoNameRdd: RDD[(String, GeoName)]
): RDD[(String, Array[GeoName])] = {

  inputNameRdd
    .join(geoNameRdd)
    .map { case (key, (name, geo)) =>
      (name, geo)
    }
    .groupByKey()
    .mapValues(_.toSet)

}

An interesting pattern can be discussed here. How does Spark perform a join operation on large datasets? Called the Reduce-Side-Join pattern in legacy MapReduce, it requires the framework to hash all the keys from both RDDs and send all elements with a same key (same hash) on a dedicated node in order to locally join their values. The principle of Reduce-Side-Join is illustrated in Figure 2 as follows. Because a Reduce-Side-Join is an expensive task (network bound), we must take special care addressing the following two points:

  • GeoNames dataset is much larger than our input RDD.We will be wasting lots of effort shuffling data that wouldn't match anyway, making our join not only inefficient, but mainly useless.
  • GeoNames dataset does not change over time.It wouldn't make sense to re-shuffle this immutable dataset on a pseudo real-time system (such as Spark Streaming) where location events are received in batch.

We can build two different strategies, an offline and an online strategy. The former will make use of a Bloom filter to drastically reduce the amount of data to be shuffled while the latter will partition our RDD by key in order to reduce the network cost associated to a join operation.

Building an efficient join
Figure 2: The Reduce-Side-Join

Offline strategy - Bloom filtering

Bloom filter is a space efficient probabilistic data structure that is used to test whether an element is a member of a set with a limited probability of false positives. Heavily used in legacy MapReduce, some implementations have been compiled for Scala. We will use the Bloom filter of breeze library, available on maven central (breeze itself can be used without much of dependency mismatches compared to the ScalaNLP models we were discussing earlier).

<dependency>
  <groupId>org.scalanlp</groupId>
  <artifactId>breeze_2.11</artifactId>
  <version>0.12</version>
</dependency>

Because our input dataset is much smaller than our geoNameRDD, we will train a Bloom filter against the former by leveraging the mapPartitions function. Each executor will build its own Bloom filter that we can aggregate, thanks to its associative property, into a single object using a bitwise operator within a reduce function:

val bfSize = inputRdd.count()
val bf: BloomFilter[String] = inputRdd.mapPartitions { it =>
  val bf = BloomFilter.optimallySized[String](bfSize, 0.001)
  it.foreach { cleanName =>
    bf += cleanName
  }
  Iterator(bf)
} reduce(_ | _)

We test our filter against the full geoNameRDD in order to remove the places we know will not match, and finally execute our same join operation, but this time with much less data:

val geoNameFilterRdd = geoAltNameRdd filter {
  case(name, geo) =>
    bf.contains(name)
}

val resultRdd = geoLookup(inputNameRdd, geoNameFilterRdd)

By reducing the size of our geoNameRDD, we have been able to release a lot of pressure from the shuffling process, making our join operation much more efficient. The resulting Reduce-Side-Join is reported on following Figure 3:

Offline strategy - Bloom filtering
Figure 3: Reduce-Side-Join with Bloom filter

Online strategy - Hash partitioning

In an offline process, we were reducing the amount of data to be shuffled by pre-processing our geoNameRDD. In a streaming process, because any new batch of data is different, it wouldn't be worth filtering our reference data over and over. In such a scenario, we can greatly improve the join performance by pre-partitioning our geoNameRDD data by key, using a HashPartitioner with the number of partitions being at least the number of executors. Because the Spark framework knows about the repartitioning used, only the input RDD would be sent to the shuffle, making our lookup service significantly faster. This is illustrated in Figure 4. Note the cache and count methods used to enforce the partitioning. Finally, we can safely execute our same join operation, this time with much less pressure on the network:

val geoAltNamePartitionRdd = geoAltNameRdd.partitionBy(
  new HashPartitioner(100)
).cache()

geoAltNamePartitionRdd.count()
val resultRdd = geoLookup(inputNameRdd, geoAltNamePartitionRdd)

Online strategy - Hash partitioning
Figure 4: Reduce-Side-Join with Hash partitioning

Content deduplication

With cities like Manchester being found 100 times in our dataset, we need to work on a deduplication strategy for similar names, taking into account some cities might not be as important as others in terms of probability of being found within a random text content.

Context learning

The most accurate method for de-duplicating locations content would probably be to study location records in their contexts, similar to Apple – the company – being to Google and Yahoo! what Apple – the fruit – is to banana and orange. By machine learning locations in their context, we would probably discover that words beavers and bears are contextually close to the city of London in Ontario, Canada. As far as we know, the risk of bumping into a wild bear in London, UK is pretty small. Assuming one can access the text content, training a model shouldn't be difficult, but accessing the geo coordinates would require building an indexed dictionary of every single place with both its geographical values and its most describing topics. Because we do not have access to such a dataset (we could be scraping Wikipedia though), and that we do not want to assume one gets access to text content, we will simply be ranking places as an order of importance.

Location scoring

Given the different codes we pulled from the GeoNames website, we assume a continent will be more important than a country, a country will be more important than a state or a capital, and so on. This naive approach will make sense for 80% of the time, but might return irrelevant results in some edge cases. Given the Manchester example, we will find Manchester as being the parish of Manchester, a major state in Jamaica, instead of Manchester city, a simple city in the UK. We can fix this issue by being less restrictive in term of scoring and by sorting places of a same score by descending order of population. Returning the most important and relevant place makes sense, and such an approach is done by most online APIs anyway, but is that fair for the less important cities? We improve our scoring engine by adding a unique reference ID to a context where several locations may be mentioned together. If a document is only focused on cities in Canada, and if nothing is mentioning the United Kingdom, then London would most likely be the place in Canada. If no country or state is mentioned, or if both Canada and United Kingdom are found, we take the most important city of London in our dataset being London in the UK. The de-duplication occurs by sorting all our matching records by similar continent/country/states mentioned in the context, then by importance, and finally by population. The first result will be returned as our best candidate.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.59.145