Using Elasticsearch as a caching layer

Our ultimate goal is to train a new classifier at each batch (every 15 minutes). However, the classifier will be trained using more than just the few records we downloaded within that current batch. We somehow have to cache the text content over a larger period of time (set to 24h) and retrieve it whenever we need to train a new classifier. With Larry Wall's quote in mind, we will try to be as lazy as possible maintaining the data consistency over this online layer. The basic idea is to use a Time to live (TTL) parameter that will seamlessly drop any outdated record. The Cassandra database provides this feature out of the box (so does HBase or Accumulo), but Elasticsearch is already part of our core architecture and can easily be used for that purpose. We will create the following mapping for the gzet/twitter index with the _ttl parameter enabled:

$ curl -XPUT 'http://localhost:9200/gzet'
$ curl -XPUT 'http://localhost:9200/gzet/_mapping/twitter' -d '
{
    "_ttl" : {
           "enabled" : true
    },
    "properties": {
      "body": {
        "type": "string"
      },
      "time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "tags": {
        "type": "string",
        "index": "not_analyzed"
      },
      "batch": {
        "type": "integer"
      }
    }
}'

Our records will exist on Elasticsearch for a period of 24h (the TTL value is defined on insert) after which any record will simply be discarded. As we delegate the maintenance tasks to Elasticsearch, we can safely pull all possible records from our online cache without worrying too much about any outdated value. All the retrieved data will be used as a training set for our classifier. The high level process is reported in the following figure:

Using Elasticsearch as a caching layer
Figure 10: Using Elasticsearch as a caching layer

For each RDD in our data stream, we retrieve all existing records from the previous 24h, cache our current set of Twitter content, and train a new classifier. Converting a data stream into RDDs is a simple operation using the foreachRDD function.

We persist current records into Elasticsearch using the saveToEsWithMeta function from the Elasticsearch API. This function accepts the TTL parameter as part of the metadata map (set to 24h, in seconds, and formatted as String):

import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd.Metadata._
   
def saveCurrentBatch(twitterContent: RDD[(String, Array[String])]) = {
  twitterContent mapPartitions { it =>
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    it map { case (content, tags) =>
      val data = Map(
        "time" -> sdf.format(new Date()),
        "body" -> content,
        "tags" -> tags
      )
      val metadata = Map(
        TTL -> "172800s"
      )
      (metadata, data)
     }
   } saveToEsWithMeta "gzet/twitter"
 }

It is worth executing a simple check on Elasticsearch in order to make sure that the TTL parameter has been properly set, and is effectively decreasing every second. Once it has reached 0, the indexed document should be dropped. The following simple command prints out the _ttl value for document ID [AVRr9LaCoYjYhZG9lvBl] every second. This uses a simple jq utility (https://stedolan.github.io/jq/download/) to parse JSON objects from the command line:

$ while true ; do TTL=`curl -XGET 'http://localhost:9200/gzet/twitter/AVRr9LaCoYjYhZG9lvBl' 2>/dev/null | jq "._ttl"`; echo "TTL is $TTL"; sleep 1; done

../..
TTL is 48366081
TTL is 48365060
TTL is 48364038
TTL is 48363016
../..

All the online records (records with unexpired TTL) can be retrieved into an RDD using the following function. Similar to what we've done in Chapter 7, Building communities, extracting lists from Elasticsearch is far easier using JSON parsing than Spark DataFrame:

import org.elasticsearch.spark._
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
   
def getOnlineRecords(sc: SparkContext) = {
  sc.esJsonRDD("gzet/twitter").values map { jsonStr =>
    implicit val format = DefaultFormats
     val json = parse(jsonStr)
     val tags = (json  "tags").extract[Array[String]]
     val body = (json  "body").extract[String]
     (body, tags)
   }
 }

We download all Twitter contents from our caching layer while saving our current batch. The remaining process is to train our classification algorithm. This method is discussed in the following section:

twitterContent foreachRDD { batch =>
 
  val sc = batch.sparkContext 
  batch.cache()
 
  if(batch.count() > 0) {
    val window = getOnlineRecords(sc)
    saveCurrentBatch(batch)
    val trainingSet = batch.union(window)
    //Train method described hereafter
    trainAndSave(trainingSet, modelOutputDir)
  }

  batch.unpersist(blocking = false)
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.77.208