Inserting documents

Let's insert some documents into our newly created database. We want to store information about GitHub users, using the following document structure:

{
    id: <mongodb object id>,
    login: "pbugnion",
    github_id: 1392879,
    repos: [ 
        {
            name: "scikit-monaco",
            id: 14821551,
            language: "Python"
        },
        {
            name: "contactpp",
            id: 20448325,
            language: "Python"
        }
    ]
}

Casbah provides a DBObject class to represent MongoDB documents (and subdocuments) in Scala. Let's start by creating a DBObject instance for each repository subdocument:

scala> val repo1 = DBObject("name" -> "scikit-monaco", "id" -> 14821551, "language" -> "Python")
repo1: DBObject = { "name" : "scikit-monaco" , "id" : 14821551, "language" : "Python"}

As you can see, a DBObject is just a list of key-value pairs, where the keys are strings. The values have compile-time type AnyRef, but Casbah will fail (at runtime) if you try to add a value that cannot be serialized.

We can also create DBObject instances from lists of key-value pairs directly. This is particularly useful when converting from a Scala map to a DBObject:

scala> val fields:Map[String, Any] = Map(
  "name" -> "contactpp",
  "id" -> 20448325,
  "language" -> "Python"
)
Map[String, Any] = Map(name -> contactpp, id -> 20448325, language -> Python)

scala> val repo2 = DBObject(fields.toList)
repo2: dDBObject = { "name" : "contactpp" , "id" : 20448325, "language" : "Python"}

The DBObject class provides many of the same methods as a map. For instance, we can address individual fields:

scala> repo1("name")
AnyRef = scikit-monaco

We can construct a new object by adding a field to an existing object:

scala> repo1 + ("fork" -> true)
mutable.Map[String,Any] = { "name" : "scikit-monaco" , "id" : 14821551, "language" : "python", "fork" : true}

Note the return type: mutable.Map[String,Any]. Rather than implementing methods such as + directly, Casbah adds them to DBObject by providing an implicit conversion to and from mutable.Map.

New DBObject instances can also be created by concatenating two existing instances:

scala> repo1 ++ DBObject(
  "locs" -> 6342, 
  "description" -> "Python library for Monte Carlo integration"
)
DBObject = { "name" : "scikit-monaco" , "id" : 14821551, "language" : "Python", "locs" : 6342 , "description" : "Python library for Monte Carlo integration"}

DBObject instances can then be inserted into a collection using the += operator. Let's insert our first document into the user collection:

scala> val userDocument = DBObject(
  "login" -> "pbugnion", 
  "github_id" -> 1392879, 
  "repos" -> List(repo1, repo2)
)
userDocument: DBObject = { "login" : "pbugnion" , ... }

scala> val coll = MongoClient()("github")("users")
coll: com.mongodb.casbah.MongoCollection = users

scala> coll += userDocument
com.mongodb.casbah.TypeImports.WriteResult = WriteResult{, n=0, updateOfExisting=false, upsertedId=null}

A database containing a single document is a bit boring, so let's add a few more documents queried directly from the GitHub API. You learned how to query the GitHub API in the previous chapter, so we won't dwell on how to do this here.

In the code examples for this chapter, we have provided a class called GitHubUserIterator that queries the GitHub API (specifically the /users endpoint) for user documents, converts them to a case class, and offers them as an iterator. You will find the class in the code examples for this chapter (available on GitHub at https://github.com/pbugnion/s4ds/tree/master/chap08) in the GitHubUserIterator.scala file. The easiest way to have access to the class is to open an SBT console in the directory of the code examples for this chapter. The API then fetches users in increasing order of their login ID:

scala> val it = new GitHubUserIterator
it: GitHubUserIterator = non-empty iterator

scala> it.next // Fetch the first user
User = User(mojombo,1,List(Repo(...

GitHubUserIterator returns instances of the User case class, defined as follows:

// User.scala
case class User(login:String, id:Long, repos:List[Repo])

// Repo.scala
case class Repo(name:String, id:Long, language:String)

Let's write a short program to fetch 500 users and insert them into the MongoDB database. We will need to authenticate with the GitHub API to retrieve these users. The constructor for GitHubUserIterator takes the GitHub OAuth token as an optional argument. We will inject the token through the environment, as we did in the previous chapter.

We first give the entire code listing before breaking it down—if you are typing this out, you will need to copy GitHubUserIterator.scala from the code examples for this chapter to the directory in which you are running this to access the GitHubUserIterator class. The class relies on scalaj-http and json4s, so either copy the build.sbt file from the code examples or specify those packages as dependencies in your build.sbt file.

// InsertUsers.scala

import com.mongodb.casbah.Imports._

object InsertUsers {

  /** Function for reading GitHub token from environment. */
  lazy val token:Option[String] = sys.env.get("GHTOKEN") orElse {
    println("No token found: continuing without authentication")
    None
  }

  /** Transform a Repo instance to a DBObject */
  def repoToDBObject(repo:Repo):DBObject = DBObject(
    "github_id" -> repo.id,
    "name" -> repo.name,
    "language" -> repo.language
  )

  /** Transform a User instance to a DBObject */
  def userToDBObject(user:User):DBObject = DBObject(
    "github_id" -> user.id,
    "login" -> user.login,
    "repos" -> user.repos.map(repoToDBObject)
  )

  /** Insert a list of users into a collection. */
  def insertUsers(coll:MongoCollection)(users:Iterable[User]) {
    users.foreach { user => coll += userToDBObject(user) }
  }

  /**  Fetch users from GitHub and passes them to `inserter` */
  def ingestUsers(nusers:Int)(inserter:Iterable[User] => Unit) {
    val it = new GitHubUserIterator(token)
    val users = it.take(nusers).toList
    inserter(users)
  }

  def main(args:Array[String]) {
    val coll = MongoClient()("github")("users")
    val nusers = 500
    coll.dropCollection()
    val inserter = insertUsers(coll)_
    ingestUsers(inserter)(nusers)
  }

}

Before diving into the details of how this program works, let's run it through SBT. You will want to query the API with authentication to avoid hitting the rate limit. Recall that we need to set the GHTOKEN environment variable:

$ GHTOKEN="e83638..." sbt
$ runMain InsertUsers

The program will take about five minutes to run (depending on your Internet connection). To verify that the program works, we can query the number of documents in the users collection of the github database:

$ mongo github --quiet --eval "db.users.count()"
500

Let's break the code down. We first load the OAuth token to authenticate with the GithHub API. The token is stored as an environment variable, GHTOKEN. The token variable is a lazy val, so the token is loaded only when we formulate the first request to the API. We have already used this pattern in Chapter 7, Web APIs.

We then define two methods to transform from classes in the domain model to DBObject instances:

def repoToDBObject(repo:Repo):DBObject = ...
def userToDBObject(user:User):DBObject = ...

Armed with these two methods, we can add users to our MongoDB collection easily:

def insertUsers(coll:MongoCollection)(users:Iterable[User]) {
  users.foreach { user => coll += userToDBObject(user) }
}

We used currying to split the arguments of insertUsers. This lets us use insertUsers as a function factory:

val inserter = insertUsers(coll)_

This creates a new method, inserter, with signature Iterable[User] => Unit that inserts users into coll. To see how this might come in useful, let's write a function to wrap the whole data ingestion process. This is how a first attempt at this function could look:

def ingestUsers(nusers:Int)(inserter:Iterable[User] => Unit) {
  val it = new GitHubUserIterator(token)
  val users = it.take(nusers).toList
  inserter(users)
}

Notice how ingestUsers takes a method that specifies how the list of users is inserted into the database as its second argument. This function encapsulates the entire code specific to insertion into a MongoDB collection. If we decide, at some later date, that we hate MongoDB and must insert the documents into a SQL database or write them to a flat file, all we need to do is pass a different inserter function to ingestUsers. The rest of the code remains the same. This demonstrates the increased flexibility afforded by using higher-order functions: we can easily build a framework and let the client code plug in the components that it needs.

The ingestUsers method, as defined previously, has one problem: if the nusers value is large, it will consume a lot of memory in constructing the entire list of users. A better solution would be to break it down into batches: we fetch a batch of users from the API, insert them into the database, and move on to the next batch. This allows us to control memory usage by changing the batch size. It is also more fault tolerant: if the program crashes, we can just restart from the last successfully inserted batch.

The .grouped method, available on all iterables, is useful for batching. It returns an iterator over fragments of the original iterable:

scala> val it = (0 to 10)
it: Range.Inclusive = Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> it.grouped(3).foreach { println } // In batches of 3
Vector(0, 1, 2)
Vector(3, 4, 5)
Vector(6, 7, 8)
Vector(9, 10)

Let's rewrite our ingestUsers method to use batches. We will also add a progress report after each batch in order to give the user some feedback:

/**  Fetch users from GitHub and pass them to `inserter` */
def ingestUsers(nusers:Int)(inserter:Iterable[User] => Unit) {
  val batchSize = 100
  val it = new GitHubUserIterator(token)
  print("Inserted #users: ")
  it.take(nusers).grouped(batchSize).zipWithIndex.foreach {
    case (users, batchNumber) =>
      print(s"${batchNumber*batchSize} ")
      inserter(users)
  }
  println()
}

Let's look at the highlighted line more closely. We start from the user iterator, it. We then take the first nusers. This returns an Iterator[User] that, instead of happily churning through every user in the GitHub database, will terminate after nusers. We then group this iterator into batches of 100 users. The .grouped method returns Iterator[Iterator[User]]. We then zip each batch with its index so that we know which batch we are currently processing (we use this in the print statement). The .zipWithIndex method returns Iterator[(Iterator[User], Int)]. We unpack this tuple in the loop using a case statement that binds users to Iterator[User] and batchNumber to the index. Let's run this through SBT:

$ GHTOKEN="2502761..." sbt 
> runMain InsertUsers
[info] Running InsertUsers
Inserted #users: 0 100 200 300 400
[success] Total time: 215 s, completed 01-Nov-2015 18:44:30
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.19.243