Data processing pipeline for Entree

In this section, we will design a pipeline that will allow us to stream as well as persist the data. The persistence will allow us to look up the data on demand. Streaming will allow the learning algorithm to keep learning as soon as new data arrives.

We use the following technologies to achieve our goals:

  • Akka: For message based concurrence to delegate as much as we can
  • MongoDB: For data persistence and querying
  • Apache Kafka: For high performance persistence queuing
  • Apache Spark: For high throughput stream processing

We have already covered the setup of MongoDB and Apache Kafka in this chapter, and Apache Spark in the previous chapter. We only need to add Apache Spark streaming libraries to our build file build.sbt:

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.3.0"

We are all set with the required dependencies to create a streaming example. This is essentially the flow of data across different components:

Entree dataset text files → Akka → MongoDB → Apache Kafka → Apache Spark

The entry point to this pipeline is EntreeDatasetPipeline Scala object's main() method which is defined in EntreeDatasetPipeline.scala. This starts the MainActor that reads the data from text files. In our case, we will first read from text files (Entree dataset), read and parse the data into case classes. This is done using plain Scala code.

Data processing pipeline for Entree

As soon as entries from text files are read, MainActor delegates the task of storing those entries into MongoDB using DBPersistenceActor. MongoDB serves as a persistent datastore, for archival as well as for querying data as and when needed.

MainActor also delegates the task of enqueuing the user session data to the Apache Kafka queue. This queue is what will become the source of stream data processing that we will do using Apache Spark.

Data processing pipeline for Entree

Now, separately in the main() method of EntreeDatasePipeline, a Spark stream is created that listens to the user session data. This is what completes the processing pipeline. A key piece to study here is the SessionDataReceiver class. This class implements the receiver that assists us in pushing data to the Spark stream. Finally, the Spark stream repeatedly prints the most visited restaurants in Chicago in the recent past.

Data processing pipeline for Entree

Let's see some output now. First ensure that both the MongoDB and Apache Kafka servers are running, as mentioned earlier in this chapter. Then invoke the pipeline:

$ sbt 'run-main chapter02.EntreeDatasetPipeline /home/tuxdna/work/packt/dataset/entree'
Number of restuarants: 4160

... OUTPUT SKIPPED ...

Next batch...

foodlife was recently visited 86 times
Anna Maria Pasteria was recently visited 71 times
Stanley's Kitchen & Tap was recently visited 59 times
Poor Phil's Oyster Bar was recently visited 54 times
Big Bowl Cafe was recently visited 53 times

Next batch...

foodlife was recently visited 88 times
Stanley's Kitchen & Tap was recently visited 63 times
Anna Maria Pasteria was recently visited 58 times
Poor Phil's Oyster Bar was recently visited 53 times
TRIO was recently visited 44 times

Next batch...

foodlife was recently visited 95 times
Anna Maria Pasteria was recently visited 80 times
Stanley's Kitchen & Tap was recently visited 72 times
Poor Phil's Oyster Bar was recently visited 51 times
Dave's Italian Kitchen was recently visited 48 times
Loading session from: /home/tuxdna/work/packt/dataset/entree/session/session.1999-Q2
    Number of recorded sessions: 1299

... OUTPUT SKIPPED ...

This output is interesting. It seems the line foodlife has been very popular recently. Notice that we are not storing these patterns anywhere just yet. This can easily be achieved by storing an aggregate over all the past data into MongoDB. There are so many things that can be done from here on.

While the pipeline is running, you could also inspect MongoDB to ensure that the data is actually being populated there too:

$ mongo
MongoDB shell version: 2.6.3
connecting to: test
> use entree
switched to db entree
> db.restaurants.find()
{ "_id" : ObjectId("5549d273e4b00867355c7bcb"), "id" : "0000000", "name" : "Tanner's", "features" : [ "100", "253", "250", "178", "174", "063", "059", "036", "008", "074", "204", "052", "163" ], "city" : "atlanta" }
{ "_id" : ObjectId("5549d273e4b00867355c7bcc"), "id" : "0000001", "name" : "Frijoleros", "features" : [ "250", "062", "132", "174", "063", "197", "071", "142", "234", "243", "075", "204", "052", "162" ], "city" : "atlanta" }
{ "_id" : ObjectId("5549d273e4b00867355c7bcd"), "id" : "0000002", "name" : "Indian Delights", "features" : [ "253", "250", "150", "174", "083", "059", "036", "117", "243", "076", "205", "051", "162" ], "city" : "atlanta" }
... OUTPUT SKIPPED ...
{ "_id" : ObjectId("5549d273e4b00867355c7bdc"), "id" : "0000017", "name" : "Jonathan Lee's", "features" : [ "253", "231", "245", "191", "192", "174", "059", "036", "039", "235", "075", "205", "052", "163" ], "city" : "atlanta" }
{ "_id" : ObjectId("5549d273e4b00867355c7bdd"), "id" : "0000018", "name" : "The Country Place", "features" : [ "253", "099", "231", "250", "062", "132", "191", "192", "174", "071", "083", "024", "215", "005", "076", "206", "054", "165" ], "city" : "atlanta" }
{ "_id" : ObjectId("5549d273e4b00867355c7bde"), "id" : "0000019", "name" : "Hama's", "features" : [ "253", "250", "245", "174", "128", "075", "205", "052", "164" ], "city" : "atlanta" }
Type "it" for more
>

Indeed, it works! We could also see sessions data using the db.sessions.find() command in MongoDB shell. That is left up to the reader to explore.

That completes our task for this chapter but there are several questions worth pondering over after this exercise.

How does a pipeline relate to a recommendation engine?

In a typical recommendation system:

  • We obtain the data of historical transactions along with other metadata.
  • We convert these historical transactions into appropriate instances and features.
  • We model the information we already have for the user, and his likes.
  • We only know what a user has already liked in the past. That means we are merely predicting the future based on past experience. It is only an estimate, which we want to be as accurate as possible.
  • We sometimes mean interesting as surprising, something that a user has not liked before would possibly like.

On close observation, we can clearly see that the preceding list is a sequence of steps that keep happening all the time in succession. This is exactly like a pipeline we created earlier, although we need to put much more effort into creating a good recommendation engine.

How does it relate to information retrieval?

If there were only say five restaurants, it would be easy for a user to decide, based on the cuisine, location, cost, and so on. However, this is not possible when there are many thousands of options. This problem is in a way a problem of search, relevance and taste. A user is searching for a restaurant, and only those which are relevant, according to his/her taste. Now, all we need to do is encode the problem of recommendation in terms of search, relevance, and taste.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.136.17.139