In this section, we will design a pipeline that will allow us to stream as well as persist the data. The persistence will allow us to look up the data on demand. Streaming will allow the learning algorithm to keep learning as soon as new data arrives.
We use the following technologies to achieve our goals:
We have already covered the setup of MongoDB and Apache Kafka in this chapter, and Apache Spark in the previous chapter. We only need to add Apache Spark streaming libraries to our build file build.sbt
:
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.3.0"
We are all set with the required dependencies to create a streaming example. This is essentially the flow of data across different components:
Entree dataset text files → Akka → MongoDB → Apache Kafka → Apache Spark
The entry point to this pipeline is EntreeDatasetPipeline
Scala object's main()
method which is defined in EntreeDatasetPipeline.scala
. This starts the MainActor
that reads the data from text files. In our case, we will first read from text files (Entree dataset), read and parse the data into case classes. This is done using plain Scala code.
As soon as entries from text files are read, MainActor
delegates the task of storing those entries into MongoDB using DBPersistenceActor. MongoDB serves as a persistent datastore, for archival as well as for querying data as and when needed.
MainActor
also delegates the task of enqueuing the user session data to the Apache Kafka queue. This queue is what will become the source of stream data processing that we will do using Apache Spark.
Now, separately in the main()
method of EntreeDatasePipeline
, a Spark stream is created that listens to the user session data. This is what completes the processing pipeline. A key piece to study here is the SessionDataReceiver
class. This class implements the receiver that assists us in pushing data to the Spark stream. Finally, the Spark stream repeatedly prints the most visited restaurants in Chicago in the recent past.
Let's see some output now. First ensure that both the MongoDB and Apache Kafka servers are running, as mentioned earlier in this chapter. Then invoke the pipeline:
$ sbt 'run-main chapter02.EntreeDatasetPipeline /home/tuxdna/work/packt/dataset/entree' Number of restuarants: 4160 ... OUTPUT SKIPPED ... Next batch... foodlife was recently visited 86 times Anna Maria Pasteria was recently visited 71 times Stanley's Kitchen & Tap was recently visited 59 times Poor Phil's Oyster Bar was recently visited 54 times Big Bowl Cafe was recently visited 53 times Next batch... foodlife was recently visited 88 times Stanley's Kitchen & Tap was recently visited 63 times Anna Maria Pasteria was recently visited 58 times Poor Phil's Oyster Bar was recently visited 53 times TRIO was recently visited 44 times Next batch... foodlife was recently visited 95 times Anna Maria Pasteria was recently visited 80 times Stanley's Kitchen & Tap was recently visited 72 times Poor Phil's Oyster Bar was recently visited 51 times Dave's Italian Kitchen was recently visited 48 times Loading session from: /home/tuxdna/work/packt/dataset/entree/session/session.1999-Q2 Number of recorded sessions: 1299 ... OUTPUT SKIPPED ...
This output is interesting. It seems the line foodlife
has been very popular recently. Notice that we are not storing these patterns anywhere just yet. This can easily be achieved by storing an aggregate over all the past data into MongoDB. There are so many things that can be done from here on.
While the pipeline is running, you could also inspect MongoDB to ensure that the data is actually being populated there too:
$ mongo MongoDB shell version: 2.6.3 connecting to: test > use entree switched to db entree > db.restaurants.find() { "_id" : ObjectId("5549d273e4b00867355c7bcb"), "id" : "0000000", "name" : "Tanner's", "features" : [ "100", "253", "250", "178", "174", "063", "059", "036", "008", "074", "204", "052", "163" ], "city" : "atlanta" } { "_id" : ObjectId("5549d273e4b00867355c7bcc"), "id" : "0000001", "name" : "Frijoleros", "features" : [ "250", "062", "132", "174", "063", "197", "071", "142", "234", "243", "075", "204", "052", "162" ], "city" : "atlanta" } { "_id" : ObjectId("5549d273e4b00867355c7bcd"), "id" : "0000002", "name" : "Indian Delights", "features" : [ "253", "250", "150", "174", "083", "059", "036", "117", "243", "076", "205", "051", "162" ], "city" : "atlanta" } ... OUTPUT SKIPPED ... { "_id" : ObjectId("5549d273e4b00867355c7bdc"), "id" : "0000017", "name" : "Jonathan Lee's", "features" : [ "253", "231", "245", "191", "192", "174", "059", "036", "039", "235", "075", "205", "052", "163" ], "city" : "atlanta" } { "_id" : ObjectId("5549d273e4b00867355c7bdd"), "id" : "0000018", "name" : "The Country Place", "features" : [ "253", "099", "231", "250", "062", "132", "191", "192", "174", "071", "083", "024", "215", "005", "076", "206", "054", "165" ], "city" : "atlanta" } { "_id" : ObjectId("5549d273e4b00867355c7bde"), "id" : "0000019", "name" : "Hama's", "features" : [ "253", "250", "245", "174", "128", "075", "205", "052", "164" ], "city" : "atlanta" } Type "it" for more >
Indeed, it works! We could also see sessions data using the db.sessions.find()
command in MongoDB shell. That is left up to the reader to explore.
That completes our task for this chapter but there are several questions worth pondering over after this exercise.
How does a pipeline relate to a recommendation engine?
In a typical recommendation system:
On close observation, we can clearly see that the preceding list is a sequence of steps that keep happening all the time in succession. This is exactly like a pipeline we created earlier, although we need to put much more effort into creating a good recommendation engine.
If there were only say five restaurants, it would be easy for a user to decide, based on the cuisine, location, cost, and so on. However, this is not possible when there are many thousands of options. This problem is in a way a problem of search, relevance and taste. A user is searching for a restaurant, and only those which are relevant, according to his/her taste. Now, all we need to do is encode the problem of recommendation in terms of search, relevance, and taste.
3.136.17.139