GeoMesa

GeoMesa is an open source product designed to leverage the distributed nature of storage systems, such as Accumulo and Cassandra, to hold a distributed spatio-temporal database. With this design, GeoMesa is capable of running the large-scale geospatial analytics that are required for very large data sets, including GDELT.

We are going to use GeoMesa to store GDELT data and run our analytics across a large proportion of that data; this should give us access to enough data to train our model so that we can predict the future rise and fall of oil prices. Also, GeoMesa will enable us to plot large amounts of points on a map, so that we can visualize GDELT and any other useful data.

Installing

There is a very good tutorial on the GeoMesa website (www.geomesa.org) that guides the user through the installation process. Therefore, it is not our intention here to produce another how-to guide; there are, however, a few points worth noting that may save you time in getting everything up and running:

  • GeoMesa has a lot of components, and many of these have a lot of versions. It is very important to ensure that all of the versions of the software stack match exactly with the versions specified in the GeoMesa maven POMs. Of particular interest are Hadoop, Zookeeper, and Accumulo; the version locations can be found in the root pom.xml file in the GeoMesa tutorial and other related downloads.
  • At the time of writing, there are some additional issues when integrating GeoMesa with some of the Hadoop vendor stacks. If you are able, use GeoMesa with your own stack of Hadoop/Accumulo and so on, to ensure version compatibility.
  • The GeoMesa version dependency labeling has changed from version 1.3.0. It is very important that you ensure all of the versions line up with your chosen version of GeoMesa; if there are any conflicting classes then there will definitely be problems at some point down the line.
  • If you have not used Accumulo before, we have discussed it in detail in other chapters within this book. An initial familiarization will help greatly when using GeoMesa (see Chapter 7Building Communities).
  • When using Accumulo 1.6 or greater with GeoMesa, there is the option to use Accumulo namespaces. If you are unfamiliar with this, then opt to not use namespaces and simply copy the GeoMesa runtime JAR into /lib/text in your Accumulo root folder.
  • GeoMesa uses a few shell scripts; due to the nature of operating systems there may be the odd problem with running these scripts, depending upon your platform. The issues are minor and can be fixed with some quick Internet searches; for example when running jai-image.sh there was a minor issue with user confirmation on an Mac OSX.
  • The GeoMesa maven repository can be found at https://repo.locationtech.org/content/repositories/releases/org/locationtech/geomesa/

Once you are able to successfully run GeoMesa from the command line, we can move on to the next section.

GDELT Ingest

The next stage is to obtain the GDELT data and load it into GeoMesa. There are a number of options here, depending upon how you plan to proceed; if you are just working through this chapter, then you can use a script to download the data in one go:

$ mkdir gdelt && cd gdelt
$ wget http://data.gdeltproject.org/events/md5sums
$ for file in `cat md5sums | cut -d' ' -f3 | grep '^201[56]'` ; do wget http://data.gdeltproject.org/events/$file ; done
$ md5sum -c md5sums 2>&1 | grep '^201[56]'

This will download and verify all of the GDELT events data for 2015 and 2016. The amount of data required is something we need to estimate at this stage, as we do not know how our algorithm is going to work out, so we have chosen two years worth to start with.

An alternative to the script is to read Chapter 2, Data Acquisition, which explains in detail how to configure Apache NiFi to download the GDELT data in real time, and further it loads it to HDFS ready for use. Otherwise, a script to allow the preceding data to be transferred to HDFS is shown as follows:

$ ls -1 *.zip | xargs -n 1 unzip
$ rm *.zip
$ hdfs dfs -copyFromLocal *.CSV hdfs:///data/gdelt/

Note

HDFS uses data blocks; we want to ensure that files are stored as efficiently as possible. Writing a method to aggregate files to the HDFS block size (64 MB by default) will ensure the NameNode memory is not filled with many entries for lots of small files, and will make processing more efficient also. Large files that use more than one block (file size > 64 MB) are known as split files.

We have a substantial amount of data in HDFS (approximately 48 GB for 2015/16). Now, we will load this to Accumulo via GeoMesa.

GeoMesa Ingest

The GeoMesa tutorials discuss the idea of loading the data from HDFS to Accumulo using a MapReduce job. Let's take a look at this and create a Spark equivalent.

MapReduce to Spark

Since MapReduce (MR) is generally considered dead, or at least dying, it is very useful to know how to create Spark jobs from those existing in MR. The following method can be applied to any MR job. We will consider the GeoMesa Accumulo loading job described in the GeoMesa tutorial (geomesa-examples-gdelt) for this case.

An MR job is typically made up of three parts: the mapper, the reducer, and the driver. The GeoMesa example is a map-only job and therefore requires no reducer. The job takes a GDELT input line, creates a (Key,Value) pair from an empty Text object and the created GeoMesa SimpleFeature, and uses the GeoMesaOutputFormat to load the data to Accumulo. The full code of the MR job can be found in our repository; next this we will work through the key parts and suggest the changes required for Spark.

The job is initiated from the main method; the first few lines are related to parsing the required options from the command line, such as the Accumulo username and password. We then reach:

SimpleFeatureType featureType =
    buildGDELTFeatureType(featureName);
DataStore ds = DataStoreFinder.getDataStore(dsConf);
ds.createSchema(featureType);
runMapReduceJob(featureName, dsConf,
    new Path(cmd.getOptionValue(INGEST_FILE)));

The GeoMesa SimpleFeatureType is the primary mechanism used to store data in a GeoMesa data store and it needs to be initialized once, along with the data store initialization. Once this is done we execute the MR job itself. In Spark, we can pass the arguments via the command line as before, and then do the one-off setup:

spark-submit --class io.gzet.geomesa.ingest /
             --master yarn /
             geomesa-ingest.jar <accumulo-instance-id>
...

The contents of the jar contain a standard Spark job:

val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Geomesa Ingest"))

Parse the command line arguments as before, as well as performing the initialization:

val featureType = buildGDELTFeatureType(featureName)
val ds = DataStoreFinder
   .getDataStore(dsConf)
   .createSchema(featureType)

Now we can load the data from HDFS, using wildcards if required. This creates one partition for each block of the file (64 MB default), resulting in an RDD[String]:

val distDataRDD = sc.textFile(/data/gdelt/*.CSV)

Or we can fix the number of partitions, depending upon our available resources:

val distDataRDD = sc.textFile(/data/gdelt/*.CSV, 20) 

Then we can perform the map, where we can embed the function to replace the process in the original MR map method. We create a tuple (Text,SimpleFeatureType) to replicate a (Key, Value) pair so that we can use the OutputFormat in the next step. When Scala Tuples are created in this way, the resulting RDD gains extra methods, such as ReduceByKey, which is functionally equivalent to the MR Reducer (see below for further information on what we should really be using, mapPartitions):

val processedRDD = distDataRDD.map(s =>{
   // Processing as before to build the SimpleFeatureType
   (new Text, simpleFeatureType)
})

Then, we can finally output to Accumulo using the GeomesaOutputFormat from the original job:

processedRDD.saveAsNewAPIHadoopFile("output/path", classOf[Text], classOf[SimpleFeatureType], classOf[GeomesaOutputFormat])

At this stage, we have not mentioned the setup method in the MR job; this method is called before any input is processed to allocate an expensive resource like a database connection, or in our case, a reusable object, and a cleanup method is then used to release that resource if it were to persist when out of scope. In our case, the setup method is used to create a SimpleFeatureBuilder which can be reused during each call of the mapper to build SimpleFeatures for output; there is no cleanup method as the memory is automatically released when the object is out of scope (the code has completed).

The Spark map function only operates on one input at a time, and provides no means to execute code before or after transforming a batch of values. It looks reasonable to simply put the setup and cleanup code before and after a call to map:

// do setup work 
val processedRDD = distDataRDD.map(s =>{ 
   // Processing as before to build the SimpleFeatureType 
   (new Text, simpleFeatureType) 
}) 
// do cleanup work 

But, this fails for several reasons:

  • It puts any objects used in map into the map function's closure, which requires that it be serializable (for example, by implementing java.io.Serializable). Not all objects will be serializable, thus exceptions may be thrown.
  • The map function is a transformation, rather than an operation, and is lazily evaluated. Thus, instructions after the map function are not guaranteed to be executed immediately.
  • Even if the preceding issues were covered for a particular implementation, we would only be executing code on the driver, not necessarily freeing resources allocated by serialized copies.

The closest counterpart to a mapper in Spark is the mapPartitions method. This method does not map just one value to another value, but maps an Iterator of values to an Iterator of other values, akin to a bulk-map method. This means that the mapPartitions can allocate resources locally at its start:

val processedRDD = distDataRDD.mapPartitions { valueIterator =>
   // setup code for SimpleFeatureBuilder
   val transformed = valueIterator.map( . . . )
   transformed
}

However, releasing resources (cleanup) is not straightforward as we still experience the lazy evaluation problem; if resources are freed after the map, then the iterator may not have evaluated before the disappearance of those resources. One solution to this is as follows:

val processedRDD = distDataRDD.mapPartitions { valueIterator =>
  if (valueIterator.isEmpty) {
    // return an Iterator
  } else {
    //  setup code for SimpleFeatureBuilder
    valueIterator.map { s =>
// Processing as before to build the SimpleFeatureType
      val simpleFeature =
      if (!valueIterator.hasNext) {
       // cleanup here
      }
      simpleFeature
    }
  }
}

Now that we have the Spark code for ingest, there is an additional change that we could make, which is to add a Geohash field (see the following for more information on how to produce this field). To insert this field into the code, we will need an additional entry at the end of the GDELT attributes list:

Geohash:String 

And a line to set the value of the simpleFeature type:

simpleFeature.setAttribute(Geomesa, calculatedGeoHash)

Finally, we can run our Spark job to load the GeoMesa Accumulo instance with the GDELT data from HDFS. The two years of GDELT is around 100 million entries! You can check how much data is in Accumulo by using the Accumulo shell, run from the accumulo/bin directory:

./accumulo shell -u username -p password -e "scan -t gdelt_records -np" | wc

Geohash

Geohash is a geocoding system invented by Gustavo Niemeyer. It is a hierarchical, spatial data structure that subdivides space into buckets of grid shape, which is one of the many applications of what is known as a Z-order curve and generally space-filling curves.

Geohashes offer properties like arbitrary precision and the possibility of gradually removing characters from the end of the code to reduce its size (and gradually lose precision).

As a consequence of the gradual precision degradation, nearby geographical locations will often (but not always) present similar prefixes. The longer a shared prefix is, the closer the two locations are; this is very useful in GeoMesa should we want to use points from a particular area, as we can use the Geohash field added in the preceding ingest code .

The main usages of Geohashes are:

  • As a unique identifier
  • To represent point data, for example, in databases

When used in a database, the structure of geo-hashed data has two advantages. First, data indexed by Geohash will have all points for a given rectangular area in contiguous slices (the number of slices depends on the precision required and the presence of Geohash fault lines). This is especially useful in database systems where queries on a single index are much easier or faster than multiple-index queries: Accumulo, for example. Second, this index structure can be used for a quick-and-dirty proximity search: the closest points are often among the closest Geohashes. These advantages make Geohashes ideal for use in GeoMesa. The following is an extract of code from David Allsopp's excellent Geohash scala implementation https://github.com/davidallsopp/geohash-scala. This code can be used to produce Geohashes based on a lat/lon input:

/** Geohash encoding/decoding as per http://en.wikipedia.org/wiki/Geohash */
object Geohash {

  val LAT_RANGE = (-90.0, 90.0)
  val LON_RANGE = (-180.0, 180.0)

  // Aliases, utility functions
  type Bounds = (Double, Double)
  private def mid(b: Bounds) = (b._1 + b._2) / 2.0
  implicit class BoundedNum(x: Double) { def in(b: Bounds): Boolean = x >= b._1 && x <= b._2 }

  /**
   * Encode lat/long as a base32 geohash.
   *
   * Precision (optional) is the number of base32 chars desired; default is 12, which gives precision well under a meter.
   */
  def encode(lat: Double, lon: Double, precision: Int=12): String = { // scalastyle:ignore
    require(lat in LAT_RANGE, "Latitude out of range")
    require(lon in LON_RANGE, "Longitude out of range")
    require(precision > 0, "Precision must be a positive integer")
    val rem = precision % 2 // if precision is odd, we need an extra bit so the total bits divide by 5
    val numbits = (precision * 5) / 2
    val latBits = findBits(lat, LAT_RANGE, numbits)
    val lonBits = findBits(lon, LON_RANGE, numbits + rem)
    val bits = intercalatelonBits, latBits)
    bits.grouped(5).map(toBase32).mkString // scalastyle:ignore
  }

  private def findBits(part: Double, bounds: Bounds, p: Int): List[Boolean] = {
    if (p == 0) Nil
    else {
      val avg = mid(bounds)
      if (part >= avg) true :: findBits(part, (avg, bounds._2), p - 1)
// >= to match geohash.org encoding
      else false :: findBits(part, (bounds._1, avg), p - 1)
    }
  }

  /**
   * Decode a base32 geohash into a tuple of (lat, lon)
   */
  def decode(hash: String): (Double, Double) = {
    require(isValid(hash), "Not a valid Base32 number")
    val (odd, even) =toBits(hash).foldRight((List[A](), List[A]())) { case (b, (a1, a2)) => (b :: a2, a1) }
    val lon = mid(decodeBits(LON_RANGE, odd))
    val lat = mid(decodeBits(LAT_RANGE, even))
    (lat, lon)
  }

  private def decodeBits(bounds: Bounds, bits: Seq[Boolean]) =
    bits.foldLeft(bounds)((acc, bit) => if (bit) (mid(acc), acc._2) else (acc._1, mid(acc)))
}

def intercalate[A](a: List[A], b: List[A]): List[A] = a match {
 case h :: t => h :: intercalate(b, t)
 case _ => b
}

One limitation of the Geohash algorithm is in attempting to utilize it to find points in proximity to each other based on a common prefix. Edge case locations that are close to each other, but on opposite sides of the 180 degrees meridian, will result in Geohash codes with no common prefix (different longitudes for near physical locations). Points that are close by at the North and South poles will have very different Geohashes (different longitudes for near physical locations).

Also, two close locations on either side of the equator (or Greenwich meridian) will not have a long common prefix since they belong to different halves of the world; one location's binary latitude (or longitude) will be 011111... and the other 100000... so they will not have a common prefix and most bits will be flipped.

In order to do a proximity search, we could compute the southwest corner (low Geohash with low latitude and longitude) and northeast corner (high Geohash with high latitude and longitude) of a bounding box and search for Geohashes between those two. This will retrieve all points in the Z-order curve between the two corners; this also breaks down at the 180 meridians and the poles.

Finally, since a Geohash (in this implementation) is based on coordinates of longitude and latitude, the distance between two Geohashes reflects the distance in latitude/longitude coordinates between two points, which does not translate to actual distance. In this case, we can use the Haversine formula:

Geohash

This gives us the actual distance between the two points taking into account the curvature of the earth, where:

  • r is the radius of the sphere,
  • φ1, φ2: latitude of point 1 and latitude of point 2, in radians
  • λ1, λ2: longitude of point 1 and longitude of point 2, in radians

GeoServer

Now that we have successfully loaded GDELT data to Accumulo via GeoMesa, we can work towards visualizing that data on a map; this feature is very useful for plotting results of analytics on world maps, for example. GeoMesa integrates well with GeoServer for this purpose. GeoServer is an Open Geospatial Consortium (OGC) compliant with the implementation of a number of standards including Web Feature Service (WFS) and Web Map Service (WMS). "It publishes data from any major spatial data source".

We are going to use GeoServer to view the results from our analytics in a clean, presentable way. Again, we are not going to delve into getting GeoServer up and running, as there is a very good tutorial in the GeoMesa documentation that enables the integration of the two. A couple of common points to watch out for are as follows:

  • The system uses Java Advanced Imaging (JAI) libraries; if you have issues with these, specifically on a Mac, then these can often be fixed by removing the libraries from the default Java installation:
            rm /System/Library/Java/Extensions/jai_*.

    This will then allow the GeoServer versions to be used, located in $GEOSERVER_HOME/webapps/geoserver/WEB-INF/lib/

  • Again, we cannot stress the importance of versions. You must be very clear about which versions of the main modules you are using, for example, Hadoop, Accumulo, Zookeeper, and most importantly, GeoMesa. If you mix versions you will see problems and the stack traces often mask the true issue. If you do have exceptions, check and double-check your versions.

Map layers

Once GeoServer is running, we can create a layer for visualization. GeoServer enables us to publish a single or a group of layers to produce a graphic. When we create a layer, we can specify the bounding box, view the feature (which is the SimpleFeature we created in the Spark code previously), and even run a Common Query Language (CQL) query to filter the data (more about this as follows). After a layer has been created, selecting layer preview and the JPG option will produce a URL with a graphic similar to the following; temporal bounding here is for January 2016 so that the map is not overcrowded:

Map layers

The URL can be used to produce other graphics, simply by manipulating the arguments. A brief breakdown of the URL is given as follows:

The geoserver URL with the standard:

http://localhost:8080/geoserver/geomesa/wms?

The request type:

service=WMS&version=1.1.0&request=GetMap& 

The layers and styles:

layers=geomesa:event&styles=& 

Set the layer transparency, if required:

transparency=true& 

The cql statement, in this case any row that has an entry with GoldsteinScale>8:

cql_filter=GoldsteinScale>8& 

The bounding box bbox:

bbox=-180.0,-90.0,180.0,90.0& 

The height and width of the graphic:

width=768&height=384& 

Source and image type:

srs=EPSG:4326&format=image%2Fjpeg& 

Filter the content by temporal query bounds:

time=2016-01-01T00:00:00.000Z/2016-01-30T23:00:00.000Z 

The final step for this section is to attach a world map to this layer so that the image becomes more readable. If you search the Internet for world map shape files, there are a number of options; we have used one from http://thematicmapping.org. Adding one of these into GeoServer as a shape-file store, and then creating and publishing a layer before creating a layer group of our GDELT data and the shape-file, will produce an image similar to this:

Map layers

To make things a bit more interesting, we have filtered the events based on the GoldsteinScale field in the FeatureType. By adding cql_filter=GoldsteinScale > 8 to the URL, we can plot all of the points where the GoldsteinScale score was greater than eight; so essentially, the above image shows us where the highest levels of positive sentiment were located in the world, in January 2016!

CQL

Common Query Language (CQL) is a plain text query language created by the OGC for the Catalogue Web Services specification. It is a human-readable query language (unlike, for example, OGC filters) and uses a similar syntax to SQL. Although similar to SQL, CQL has much less functionality; for example, it is quite strict in requiring an attribute to be on the left side of any comparison operator.

The following lists the CQL supported operators:

  • Comparison operators: =, <>, >, >=, <, <=
  • ID, list and other operators: BETWEEN, BEFORE, AFTER, LIKE, IS, EXISTS, NOT, IN
  • Arithmetic expression operators: +, -, *, /
  • Geometric operators: EQUALS, DISJOINT, INTERSECTS, TOUCHES, CROSSES, WITHIN, CONTAINS, OVERLAPS, RELATE, DWITHIN, BEYOND

Due to the limitations of CQL, GeoServer provides an extended version of CQL called ECQL. ECQL provides much of the missing functionality of CQL, providing a more flexible language that has more in common with SQL. GeoServer supports the use of both CQL and ECQL in WMS and WFS requests.

The quickest way to test CQL queries is to amend the URL of a layer such as the one we created above, when using JPGs for example, or to use the CQL box at the bottom of the layer option within GeoMesa.

If we have several layers defined in one WMS request, such as:

http://localhost:8080/geoserver/wms?service=WMS&version=1.1.0&request=GetMap&layers=layer1,layer2,layer3   ...   

Then we may want to filter just one of those layers with the CQL query. In this case, CQL filters must be ordered in the same way that the layers are; we use the INCLUDE keyword for the layers that we don't want to filter and delimit them using a ";". For example, to filter only layer2 in our example, the WMS request would appear thus:

http://localhost:8080/geoserver/wms?service=WMS&version=1.1.0&request=GetMap&layers=layer1,layer2,layer3&cql_filter=INCLUDE;(LAYER2_COL='value');INCLUDE...   

Note

Be aware when using columns of type Date; we need to determine their format before attempting any CQL with them. Usually they will be in ISO8601 format; 2012-01-01T00:00:00Z. However, different formats may be present depending upon how the data was loaded. In our example, we have ensured the SQLDATE is in the correct format.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.238.134