Building a recommender

Now that we've explored our song analyzer, let's get back on track with the recommendation engine. As discussed earlier, we would like to recommend songs based on frequency hashes extracted from audio signals. Taking as an example the dispute between Led Zeppelin and Spirit, we would expect both songs to be relatively close to each other, as the allegation is that they share a melody. Using this thought as our main assumption, we could potentially recommend Taurus to someone interested in Stairway to Heaven.

The PageRank algorithm

Instead of recommending a specific song, we will recommend playlists. A playlist would consist of a list of all our songs ranked by relevance, most to least relevant. Let's begin with the assumption that people listen to music in a similar way to the way they browse articles on the web, that is, following a logical path from link to link, but occasionally switching direction, or teleporting, and browsing to a totally different website. Continuing with the analogy, while listening to music one can either carry on listening to music of a similar style (and hence follow their most expected journey), or skip to a random song in a totally different genre. It turns out that this is exactly how Google ranks websites by popularity using a PageRank algorithm.

Note

For more details on the PageRank algorithm visit: http://ilpubs.stanford.edu:8090/422/1/1999-66.pdf.

The popularity of a website is measured by the number of links it points to (and is referred from). In our music use case, the popularity is built as the number hashes a given song shares with all its neighbors. Instead of popularity, we introduce the concept of song commonality.

Building a Graph of Frequency Co-occurrence

We start by reading our hash values back from Cassandra and re-establishing the list of song IDs for each distinct hash. Once we have this, we can count the number of hashes for each song using a simple reduceByKey function, and because the audio library is relatively small, we collect and broadcast it to our Spark executors:

val hashSongsRDD = sc.cassandraTable[HashSongsPair]("gzet", "hashes")
 
 val songHashRDD = hashSongsRDD flatMap { hash =>
     hash.songs map { song =>
       ((hash, song), 1)
     }
   }
 
 val songTfRDD = songHashRDD map { case ((hash,songId),count) =>
     (songId, count)
   } reduceByKey(_+_)
 
 val songTf = sc.broadcast(songTfRDD.collectAsMap())

Next, we build a co-occurrence matrix by getting the cross product of every song sharing a same hash value, and count how many times the same tuple is observed. Finally, we wrap the song IDs and the normalized (using the term frequency we just broadcast) frequency count inside of an Edge class from GraphX:

implicit class Crossable[X](xs: Traversable[X]) {
      def cross[Y](ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)
 
val crossSongRDD = songHashRDD.keys
    .groupByKey()
    .values
    .flatMap { songIds =>
        songIds cross songIds filter { case (from, to) =>
           from != to
      }.map(_ -> 1)
    }.reduceByKey(_+_)
     .map { case ((from, to), count) =>
       val weight = count.toDouble /
                    songTfB.value.getOrElse(from, 1)
       Edge(from, to, weight)
    }.filter { edge =>
     edge.attr > minSimilarityB.value
   }
 
val graph = Graph.fromEdges(crossSongRDD, 0L)

We are only keeping edges with a weight (meaning a hash co-occurrence) greater than a predefined threshold in order to build our hash frequency graph.

Running PageRank

Contrary to what one would normally expect when running a PageRank, our graph is undirected. It turns out that for our recommender, the lack of direction does not matter, since we are simply trying to find similarities between Led Zeppelin and Spirit. A possible way of introducing direction could be to look at the song publishing date. In order to find musical influences, we could certainly introduce a chronology from the oldest to newest songs giving directionality to our edges.

In the following pageRank, we define a probability of 15% to skip, or teleport as it is known, to any random song, but this can be obviously tuned for different needs:

val prGraph = graph.pageRank(0.001, 0.15)

Finally, we extract the page ranked vertices and save them as a playlist in Cassandra via an RDD of the Song case class:

case class Song(id: Long, name: String, commonality: Double)
val vertices = prGraph
  .vertices
  .mapPartitions { vertices =>
    val songIds = songIdsB
  .value
  .vertices
  .map { case (songId, pr) =>
       val songName = songIds.get(vId).get
        Song(songId, songName, pr)
      }
  }
 
 vertices.saveAsCassandraTable("gzet", "playlist")

The reader may be pondering the exact purpose of PageRank here, and how it could be used as a recommender? In fact, our use of PageRank means that the highest ranking songs would be the ones that share many frequencies with other songs. This could be due to a common arrangement, key theme, or melody; or maybe because a particular artist was a major influence on a musical trend. However, these songs should be, at least in theory, more popular (by virtue of the fact they occur more often), meaning that they are more likely to have mass appeal.

On the other end of the spectrum, low ranking songs are ones where we did not find any similarity with anything we know. Either these songs are so avant-garde that no one has explored these musical ideas before, or alternatively are so bad that no one ever wanted to copy them! Maybe they were even composed by that up-and-coming artist you were listening to in your rebellious teenage years. Either way, the chance of a random user liking these songs is treated as negligible. Surprisingly, whether it is a pure coincidence or whether this assumption really makes sense, the lowest ranked song from this particular audio library is Daft Punk's--Motherboard it is a title that is quite original (a brilliant one though) and a definite unique sound.

Building personalized playlists

We just have seen that a simple PageRank could help us create a general-purpose playlist. And although this isn't targeted towards any individual, it could serve as a playlist for a random user. It is the best recommendation we can make without any information about a user's preferences. The more we will learn about a user, the better we can personalize the playlist towards what they truly prefer. To do this, we would probably follow a content-based recommendation approach.

Without up-front information about a user's preferences, we can seek to collect our own information whenever a user plays a song, and hence personalize their playlist at runtime. To do this, we will assume that our user was enjoying the previous song that they listened to. We will also need to disable teleporting and generate a new playlist that is seeded from that particular song ID.

PageRank and personalized PageRank are identical in the way that they compute scores (using the weight of incoming/outgoing edges), but the personalized version only allows users to teleport to the provided ID. A simple modification of the code allows us to personalize PageRank using a certain community ID (see Chapter 7, Building Communities, for a definition of communities) or using a certain music attribute such as the artist or the genre. Given our previous graph, a personalized page rank is implemented as follows:

val graph = Graph.fromEdges(edgeRDD, 0L)
val prGraph = graph.personalizedPageRank(id, 0.001, 0.1)

Here, the chance of teleporting to a random song is zero. There is still a 10% chance of skipping, but within only a very small tolerance of the provided song ID. In other words, regardless of the song we are currently listening to, we essentially defined a 10% chance of playing the song that we provide as a seed.

Expanding our cupcake factory

Similar to our song analyzer prototype, we want to present our suggested playlist back to our imaginary customer in a nice and tidy user interface.

Building a playlist service

Still using the Play framework, our technology stack stays the same, this time we simply create a new endpoint (a new route):

GET       /playlist      controllers.Playlist.index

Just as before, we create an additional controller that handles simple GET requests (triggered when a user loads the playlist webpage). We load the generic playlist stored in Cassandra, wrap all these songs inside of a Playlist case class, and send it back to the playlist.scala.html view. The controller model looks as follows:

def getSongs: List[Song] = {
   val s = "SELECT id, name, commonality FROM gzet.playlist;"
   val results = session.execute(s)
   results map { row =>
     val id = row.getLong("id")
     val name = row.getString("name")
     val popularity = row.getDouble("commonality")
     Song(id, name, popularity)
   } toList
 }

 def index = Action { implicit request =>
   val playlist = models.Playlist(getSongs)
   Ok(views.html.playlist(playlist))
 }

The view remains reasonably simple, as we iterate through all the songs to display, ordered by commonality (from the most to least common ones):

@(playlist: Playlist)
 
@displaySongs(playlist: Playlist) = {
   @for(node <- playlist.songs.sortBy(_.commonality).reverse) {
     <a href="/playlist/@node.id" class="list-group-item">
       <iclass="glyphiconglyphicon-play"></i>
       <span class="badge">
         @node.commonality
       </span>
       @node.name
     </a>
   }
 }
 
 @main("playlist") {
   <div class="row">
     <div class="list-group">
       @displaySongs(playlist)
     </div>
   </div>
 }

Note

Note the href attribute in each list item - any time a user clicks on a song from that list, we will be generating a new REST call to the /playlist/id endpoint (this is described in the following section).

Finally, we are pleased to unveil the recommended (generic) playlist in Figure 7. For some reason unknown to us, apparently a novice to classical music should start listening to Gustav Mahler, Symphony No. 5.

Building a playlist service
Figure 7: Playlist recommender

Leveraging the Spark job server

Here comes yet another interesting challenge. Although our list of songs for a generic playlist and PageRank score is stored on Cassandra, this is not feasible for personalized playlists, as it would require pre-computation of all the PageRank scores for all the possible song IDs. And as we want to build the personalized playlists in pseudo real time, and we might also be loading new songs fairly regularly, we need to find a better approach than starting up a SparkContext upon every request.

The first constraint is that the PageRank function by nature is a distributed process and cannot be used outside of the context of Spark (that is, inside our Play framework's JVM). We appreciate that creating a new Spark job on each http request would certainly be a bit of an overkill, so we would like to start one single Spark job and process new graphs only when needed, ideally through a simple REST API call.

The second challenge is that we do not wish to load the same graph dataset from Cassandra repeatedly. This should be loaded once and cached in Spark memory and shared across different jobs. In Spark terminology, this would require an RDD to be accessible from a shared context.

Luckily, both points are addressed with Spark job server (https://github.com/spark-jobserver/spark-jobserver). Although this project is fairly immature (or at least not quite production-ready yet), it is a perfectly viable solution for showcasing data science.

For the purpose of this book, we compile and deploy a Spark job server using a local configuration only. We strongly encourage the reader to have a deeper look at the job server website (see previous link) for more information about packaging and deployment. Once our server starts, we need to create a new context (meaning starting up a new Spark job) with additional configuration settings for handling connection to Cassandra. We give this context a name so that we can use it later on:

curl -XPOST 'localhost:8090/contexts/gzet?
  num-cpu-cores=4&
  memory-per-node=4g&
  spark.executor.instances=2&
  spark.driver.memory=2g&
  passthrough.spark.cassandra.connection.host=127.0.0.1&
  passthrough.spark.cassandra.connection.port=9042'

The next step is to modify our code to be Spark job server compliant. We need the following dependency:

<dependency>
   <groupId>spark.jobserver</groupId>
   <artifactId>job-server-api_2.11</artifactId>
   <version>spark-2.0-preview</version>
 </dependency>

We modify our SparkJob using the signature of the SparkJob interface that comes with job server. This is a requirement of all Spark job server jobs:

object PlaylistBuilder extends SparkJob {
 
  override def runJob(
    sc: SparkContext,
    jobConfig: Config
  ): Any = ???
 
  override def validate(
    sc: SparkContext,
    config: Config
  ): SparkJobValidation = ???
 
}

In the validate method, we ensure that all the job requirements will be satisfied (such as the input configuration needed for that job), and in runJob we execute our normal Spark logic just as we did before. The last change is that, while we will still be storing our generic playlist into Cassandra, we will cache the nodes and edges RDDs in Spark shared memory where it will be made available to further jobs. This can be done by extending the NamedRddSupport trait.

We simply have to save both edges and node RDDs (note that saving a Graph object is not supported yet) to keep accessing the graph in subsequent jobs:

this.namedRdds.update("rdd:edges", edgeRDD)
this.namedRdds.update("rdd:nodes", nodeRDD)

From the personalized Playlist job, we retrieve and process our RDDs as follows:

val edgeRDD = this.namedRdds.get[Edge]("rdd:edges").get
val nodeRDD = this.namedRdds.get[Node]("rdd:nodes").get

val graph = Graph.fromEdges(edgeRDD, 0L)

We then execute our personalized PageRank, but instead of saving the results back to Cassandra, we will simply collect the first 50 songs. When deployed, this action will implicitly output this list back to the client thanks to the magic of the job server:

val prGraph = graph.personalizedPageRank(id, 0.001, 0.1)

prGraph
 .vertices
 .map { case(vId, pr) =>
   List(vId, songIds.value.get(vId).get, pr).mkString(",")
  }
 .take(50)

We compile our code and publish our shaded jar file into job server by giving it an application name as follows:

curl --data-binary @recommender-core-1.0.jar 
 'localhost:8090/jars/gzet'

Now that we are almost ready to deploy our recommendation system, let's recap over what we are going to demo. We will be executing the two different user flows shortly:

  • When a user logs in to the recommendation page, we retrieve the latest generic playlist available in Cassandra. Alternatively, we start a new asynchronous job to create a new one if needed. This will load the required RDDs within the Spark context.
  • When a user plays a new song from our recommended items, we spin up a synchronous call to the Spark job server and build the next playlist based around this song ID.

The flow for the generic PageRank playlist is shown in Figure 8:

Leveraging the Spark job server
Figure 8: Playlist recommender process

The flow for the personalized PageRank playlist is shown in Figure 9:

Leveraging the Spark job server
Figure 9: Personalized playlist recommender process

User interface

The final remaining piece of the puzzle is to make a call to the Spark job server from the service layer in Play Framework. Although this is done programmatically using the java.net package, as it's a REST API the equivalent curl requests are shown in the following snippet:

# Asynchronous Playlist Builder
curl -XPOST 'localhost:8090/jobs?
 context=gzet&
 appName=gzet&
 classPath=io.gzet.recommender.PlaylistBuilder'
 
# Synchronous Personalized Playlist for song 12
curl -XPOST -d "song.id=12" 'localhost:8090/jobs?
 context=gzet&
 appName=gzet&
 sync=true&
 timeout=60000&
 classPath=io.gzet.recommender.PersonalizedPlaylistBuilder'

Initially, when we built our HTML code, we introduced a link, or href, to /playlist/${id}. This REST call will be converted for you into a GET request to the Playlist controller and bound to your personalize function, like so:

GET /playlist/:id controllers.Playlist.personalize(id: Long) 

The first call to the Spark job server will spin up a new Spark job synchronously, read the results back from the job output, and redirect to the same page view with an updated playlist, this time based around this song ID:

def personalize(id: Long) = Action { implicit request =>
   val name = cassandra.getSongName(id)
   try {
     val nodes = sparkServer.generatePlaylist(id)
     val playlist = models.Playlist(nodes, name)
     Ok(views.html.playlist(playlist))
   } catch {
     case e: Exception =>
       Redirect(routes.Playlist.index())
         .flashing("error" -> e.getMessage)
   }
 }

And voila, the resulting UI is shown in Figure 10. Anytime a user plays a song, the playlist will be updated and displayed, acting as a full-blown ranking recommendation engine.

User interface
Figure 10: Personalized playlist recommender process

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.20.90