Our ultimate goal is to train a new classifier at each batch (every 15 minutes). However, the classifier will be trained using more than just the few records we downloaded within that current batch. We somehow have to cache the text content over a larger period of time (set to 24h) and retrieve it whenever we need to train a new classifier. With Larry Wall's quote in mind, we will try to be as lazy as possible maintaining the data consistency over this online layer. The basic idea is to use a Time to live (TTL) parameter that will seamlessly drop any outdated record. The Cassandra database provides this feature out of the box (so does HBase or Accumulo), but Elasticsearch is already part of our core architecture and can easily be used for that purpose. We will create the following mapping for the gzet
/twitter
index with the _ttl
parameter enabled:
$ curl -XPUT 'http://localhost:9200/gzet' $ curl -XPUT 'http://localhost:9200/gzet/_mapping/twitter' -d ' { "_ttl" : { "enabled" : true }, "properties": { "body": { "type": "string" }, "time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" }, "tags": { "type": "string", "index": "not_analyzed" }, "batch": { "type": "integer" } } }'
Our records will exist on Elasticsearch for a period of 24h (the TTL value is defined on insert) after which any record will simply be discarded. As we delegate the maintenance tasks to Elasticsearch, we can safely pull all possible records from our online cache without worrying too much about any outdated value. All the retrieved data will be used as a training set for our classifier. The high level process is reported in the following figure:
For each RDD in our data stream, we retrieve all existing records from the previous 24h, cache our current set of Twitter content, and train a new classifier. Converting a data stream into RDDs is a simple operation using the foreachRDD
function.
We persist current records into Elasticsearch using the saveToEsWithMeta
function from the Elasticsearch API. This function accepts the TTL
parameter as part of the metadata map (set to 24h, in seconds, and formatted as String):
import org.elasticsearch.spark._ import org.elasticsearch.spark.rdd.Metadata._ def saveCurrentBatch(twitterContent: RDD[(String, Array[String])]) = { twitterContent mapPartitions { it => val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") it map { case (content, tags) => val data = Map( "time" -> sdf.format(new Date()), "body" -> content, "tags" -> tags ) val metadata = Map( TTL -> "172800s" ) (metadata, data) } } saveToEsWithMeta "gzet/twitter" }
It is worth executing a simple check on Elasticsearch in order to make sure that the TTL
parameter has been properly set, and is effectively decreasing every second. Once it has reached 0, the indexed document should be dropped. The following simple command prints out the _ttl
value for document ID [AVRr9LaCoYjYhZG9lvBl
] every second. This uses a simple jq
utility (https://stedolan.github.io/jq/download/) to parse JSON objects from the command line:
$ while true ; do TTL=`curl -XGET 'http://localhost:9200/gzet/twitter/AVRr9LaCoYjYhZG9lvBl' 2>/dev/null | jq "._ttl"`; echo "TTL is $TTL"; sleep 1; done ../.. TTL is 48366081 TTL is 48365060 TTL is 48364038 TTL is 48363016 ../..
All the online records (records with unexpired TTL) can be retrieved into an RDD using the following function. Similar to what we've done in Chapter 7, Building communities, extracting lists from Elasticsearch is far easier using JSON parsing than Spark DataFrame:
import org.elasticsearch.spark._ import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods._ def getOnlineRecords(sc: SparkContext) = { sc.esJsonRDD("gzet/twitter").values map { jsonStr => implicit val format = DefaultFormats val json = parse(jsonStr) val tags = (json "tags").extract[Array[String]] val body = (json "body").extract[String] (body, tags) } }
We download all Twitter contents from our caching layer while saving our current batch. The remaining process is to train our classification algorithm. This method is discussed in the following section:
twitterContent foreachRDD { batch => val sc = batch.sparkContext batch.cache() if(batch.count() > 0) { val window = getOnlineRecords(sc) saveCurrentBatch(batch) val trainingSet = batch.union(window) //Train method described hereafter trainAndSave(trainingSet, modelOutputDir) } batch.unpersist(blocking = false) }
3.147.77.208