Chapter 6. Slick – A Functional Interface for SQL

In Chapter 5, Scala and SQL through JDBC, we investigated how to access SQL databases with JDBC. As interacting with JDBC feels somewhat unnatural, we extended JDBC using custom wrappers. The wrappers were developed to provide a functional interface to hide the imperative nature of JDBC.

With the difficulty of interacting directly with JDBC from Scala and the ubiquity of SQL databases, you would expect there to be existing Scala libraries that wrap JDBC. Slick is such a library.

Slick styles itself as a functional-relational mapping library, a play on the more traditional object-relational mapping name used to denote libraries that build objects from relational databases. It presents a functional interface to SQL databases, allowing the client to interact with them in a manner similar to native Scala collections.

FEC data

In this chapter, we will use a somewhat more involved example dataset. The Federal Electoral Commission of the United States (FEC) records all donations to presidential candidates greater than $200. These records are publicly available. We will look at the donations for the campaign leading up to the 2012 general elections that resulted in Barack Obama's re-election. The data includes donations to the two presidential candidates, Obama and Romney, and also to the other contenders in the Republican primaries (there were no Democrat primaries).

In this chapter, we will take the transaction data provided by the FEC, store it in a table, and learn how to query and analyze it.

The first step is to acquire the data. If you have downloaded the code samples from the Packt website, you should already have two CSVs in the data directory of the code samples for this chapter. If not, you can download the files using the following links:

  • data.scala4datascience.com/fec/ohio.csv.gz (or ohio.csv.zip)
  • data.scala4datascience.com/fec/us.csv.gz (or us.csv.zip)

Decompress the two files and place them in a directory called data/ in the same location as the source code examples for this chapter. The data files correspond to the following:

  • The ohio.csv file is a CSV of all the donations made by donors in Ohio.
  • The us.csv file is a CSV of all the donations made by donors across the country. This is quite a large file, with six million rows.

The two CSV files contain identical columns. Use the Ohio dataset for more responsive behavior, or the nationwide data file if you want to wrestle with a larger dataset. The dataset is adapted from a list of contributions downloaded from http://www.fec.gov/disclosurep/PDownload.do.

Let's start by creating a Scala case class to represent a transaction. In the context of this chapter, a transaction is a single donation from an individual to a candidate:

// Transaction.scala
import java.sql.Date

case class Transaction(
  id:Option[Int], // unique identifier
  candidate:String, // candidate receiving the donation
  contributor:String, // name of the contributor
  contributorState:String, // contributor state
  contributorOccupation:Option[String], // contributor job
  amount:Long, // amount in cents
  date:Date // date of the donation
)

The code repository for this chapter includes helper functions in an FECData singleton object to load the data from CSVs:

scala> val ohioData = FECData.loadOhio
s4ds.FECData = s4ds.FECData@718454de

Calling FECData.loadOhio or FECData.loadAll will create an FECData object with a single attribute, transactions, which is an iterator over all the donations coming from Ohio or the entire United States:

scala> val ohioTransactions = ohioData.transactions
Iterator[Transaction] = non-empty iterator

scala> ohioTransactions.take(5).foreach(println)
Transaction(None,Paul, Ron,BROWN, TODD W MR.,OH,Some(ENGINEER),5000,2011-01-03)
Transaction(None,Paul, Ron,DIEHL, MARGO SONJA,OH,Some(RETIRED),2500,2011-01-03)
Transaction(None,Paul, Ron,KIRCHMEYER, BENJAMIN,OH,Some(COMPUTER PROGRAMMER),20120,2011-01-03)
Transaction(None,Obama, Barack,KEYES, STEPHEN,OH,Some(HR EXECUTIVE / ATTORNEY),10000,2011-01-03)
Transaction(None,Obama, Barack,MURPHY, MIKE W,OH,Some(MANAGER),5000,2011-01-03)

Now that we have some data to play with, let's try and put it in the database so that we can run some useful queries on it.

Importing Slick

To add Slick to the list of dependencies, you will need to add "com.typesafe.slick" %% "slick" % "2.1.0" to the list of dependencies in your build.sbt file. You will also need to make sure that Slick has access to a JDBC driver. In this chapter, we will connect to a MySQL database, and must, therefore, add the MySQL connector "mysql" % "mysql-connector-java" % "5.1.37" to the list of dependencies.

Slick is imported by importing a specific database driver. As we are using MySQL, we must import the following:

scala> import slick.driver.MySQLDriver.simple._
import slick.driver.MySQLDriver.simple._

To connect to a different flavor of SQL database, import the relevant driver. The easiest way of seeing what drivers are available is to consult the API documentation for the slick.driver package, which is available at http://slick.typesafe.com/doc/2.1.0/api/#scala.slick.driver.package. All the common SQL flavors are supported (including H2, PostgreSQL, MS SQL Server, and SQLite).

Defining the schema

Let's create a table to represent our transactions. We will use the following schema:

CREATE TABLE transactions(
    id INT(11) AUTO_INCREMENT PRIMARY KEY,
    candidate VARCHAR(254) NOT NULL,
    contributor VARCHAR(254) NOT NULL,
    contributor_state VARCHAR(2) NOT NULL,
    contributor_occupation VARCHAR(254),
    amount BIGINT(20) NOT NULL,
    date DATE 
);

Note that the donation amount is in cents. This allows us to use an integer field (rather than a fixed point decimal, or worse, a float).

Note

You should never use a floating point format to represent money or, in fact, any discrete quantity because floats cannot represent most fractions exactly:

scala> 0.1 + 0.2
Double = 0.30000000000000004

This seemingly nonsensical result occurs because there is no way to store 0.3 exactly in doubles.

This post gives an extensive discussion of the limitations of the floating point format:

http://docs.oracle.com/cd/E19957-01/806-3568/ncg_goldberg.html

To use Slick with tables in our database, we first need to tell Slick about the database schema. We do this by creating a class that extends the Table abstract class. The way in which a schema is defined is quite straightforward, so let's dive straight into the code. We will store our schema in a Tables singleton. We define a Transactions class that provides the mapping to go from collections of Transaction instances to SQL tables structured like the transactions table:

// Tables.scala

import java.sql.Date
import slick.driver.MySQLDriver.simple._

/** Singleton object for table definitions */
object Tables {

  // Transactions table definition
  class Transactions(tag:Tag)
  extends Table[Transaction](tag, "transactions") {
    def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
    def candidate = column[String]("candidate")
    def contributor = column[String]("contributor")
    def contributorState = column[String](
      "contributor_state", O.DBType("VARCHAR(2)"))
    def contributorOccupation = column[Option[String]](
      "contributor_occupation")
    def amount = column[Long]("amount")
    def date = column[Date]("date")

    def * = (id.?, candidate, contributor, 
      contributorState, contributorOccupation, amount, date) <> (
      Transaction.tupled, Transaction.unapply)
  }

  val transactions = TableQuery[Transactions]

}

Let's go through this line by line. We first define a Transactions class, which must take a Slick Tag object as its first argument. The Tag object is used by Slick internally to construct SQL statements. The Transactions class extends a Table object, passing it the tag and name of the table in the database. We could, optionally, have added a database name by extending Table[Transaction](tag, Some("fec"), "transactions") rather than just Table[Transaction](tag, "transactions"). The Table type is parametrized by Transaction. This means that running SELECT statements on the database returns Transaction objects. Similarly, we will insert data into the database by passing a transaction or list of transactions to the relevant Slick methods.

Let's look at the Transactions class definition in more detail. The body of the class starts by listing the database columns. For instance, the id column is defined as follows:

def id = column[Int]("id", O.PrimaryKey, O.AutoInc)

We tell Slick that it should read the column called id and transform it to a Scala integer. Additionally, we tell Slick that this column is the primary key and that it is auto-incrementing. The Slick documentation contains a list of available options for column.

The candidate and contributor columns are straightforward: we tell Slick to read these as String from the database. The contributor_state column is a little more interesting. Besides specifying that it should be read from the database as a String, we also tell Slick that it should be stored in the database with type VARCHAR(2).

The contributor_occupation column in our table can contain NULL values. When defining the schema, we pass the Option[String] type to the column method:

def contributorOccupation = column[Option[String]]("contributor_occupation")

When reading from the database, a NULL field will get converted to None for columns specified as Option[T]. Conversely, if the field has a value, it will be returned as Some(value).

The last line of the class body is the most interesting part: it specifies how to transform the raw data read from the database into a Transaction object and how to convert a Transaction object to raw fields ready for insertion:

def * = (id.?, candidate, contributor, 
contributorState, contributorOccupation, amount, date) <> (
Transaction.tupled, Transaction.unapply)

The first part is just a tuple of fields to be read from the database: (id.?, candidate, contributor, contributorState, contributorOccupation, amount, date), with a small amount of metadata. The second part is a pair of functions that describe how to transform this tuple into a Transaction object and back. In this case, as Transaction is a case class, we can take advantage of the Transaction.tupled and Transaction.unapply methods automatically provided for case classes.

Notice how we followed the id entry with .?. In our Transaction class, the donation id has the Option[Int] type, but the column in the database has the INT type with the additional O.AutoInc option. The .? suffix tells Slick to use the default value provided by the database (in this case, the database's auto-increment) if id is None.

Finally, we define the value:

val transactions = TableQuery[Transactions]

This is the handle that we use to actually interact with the database. For instance, as we will see later, to get a list of donations to Barack Obama, we run the following query (don't worry about the details of the query for now):

Tables.transactions.filter {_.candidate === "Obama, Barack"}.list

Let's summarize the parts of our Transactions mapper class:

  • The Transactions class must extend the Table abstract class parametrized by the type that we want to return: Table[Transaction].
  • We define the columns to read from the database explicitly using column, for example, def contributorState = column[String]("contributor_state", O.DBType("VARCHAR(2)")). The [String] type parameter defines the Scala type that this column gets read as. The first argument is the SQL column name. Consult the Slick documentation for a full list of additional arguments (http://slick.typesafe.com/doc/2.1.0/schemas.html).
  • We describe how to convert from a tuple of the column values to a Scala object and vice versa using def * = (id.?, candidate, ...) <> (Transaction.tupled, Transaction.unapply).

Connecting to the database

So far, you have learned how to define Table classes that encode the transformation from rows in a SQL table to Scala case classes. To move beyond table definitions and start interacting with a database server, we must connect to a database. As in the previous chapter, we will assume that there is a MySQL server running on localhost on port 3306.

We will use the console to demonstrate the functionality in this chapter, but you can find an equivalent sample program in SlickDemo.scala. Let's open a Scala console and connect to the database running on port 3306:

scala> import slick.driver.MySQLDriver.simple._
import slick.driver.MySQLDriver.simple._

scala> val db = Database.forURL(
  "jdbc:mysql://127.0.0.1:3306/test",
  driver="com.mysql.jdbc.Driver"
)
db: slick.driver.MySQLDriver.backend.DatabaseDef = slick.jdbc.JdbcBackend$DatabaseDef@3632d1dd

If you have read the previous chapter, you will recognize the first argument as a JDBC-style URL. The URL starts by defining a protocol, in this case, jdbc:mysql, followed by the IP address and port of the database server, followed by the database name (test, here).

The second argument to forURL is the class name of the JDBC driver. This driver is imported at runtime using reflection. Note that the driver specified here must match the Slick driver imported statically.

Having defined the database, we can now use it to create a connection:

scala> db.withSession { implicit session =>
  // do something useful with the database
  println(session)
}
scala.slick.jdbc.JdbcBackend$BaseSession@af5a276

Slick functions that require access to the database take a Session argument implicitly: if a Session instance marked as implicit is available in scope, they will use it. Thus, preceding session with the implicit keyword saves us having to pass session explicitly every time we run an operation on the database.

If you have read the previous chapter, you will recognize that Slick deals with the need to close connections with the loan pattern: a database connection is created in the form of a session object and passed temporarily to the client. When the client code returns, the session is closed, ensuring that all opened connections are closed. The client code is therefore spared the responsibility of closing the connection.

The loan pattern is very useful in production code, but it can be somewhat cumbersome in the shell. Slick lets us create a session explicitly as follows:

scala> implicit val session = db.createSession
session: slick.driver.MySQLDriver.backend.Session = scala.slick.jdbc.JdbcBackend$BaseSession@2b775b49

scala> session.close

Creating tables

Let's use our new connection to create the transaction table in the database. We can access methods to create and drop tables using the ddl attribute on our TableQuery[Transactions] instance:

scala> db.withSession { implicit session =>
  Tables.transactions.ddl.create
}

If you jump into a mysql shell, you will see that a transactions table has been created:

mysql> describe transactions ;
+------------------------+--------------+------+-----+
| Field                  | Type         | Null | Key |
+------------------------+--------------+------+-----+
| id                     | int(11)      | NO   | PRI |
| candidate              | varchar(254) | NO   |     |
| contributor            | varchar(254) | NO   |     |
| contributor_state      | varchar(2)   | NO   |     |
| contributor_occupation | varchar(254) | YES  |     |
| amount                 | bigint(20)   | NO   |     |
| date                   | date         | NO   |     |
+------------------------+--------------+------+-----+
7 rows in set (0.01 sec)

The ddl attribute also includes a drop method to drop the table. Incidentally, ddl stands for "data-definition language" and is commonly used to refer to the parts of SQL relevant to schema and constraint definitions.

Inserting data

Slick TableQuery instances let us interact with SQL tables with an interface similar to Scala collections.

Let's create a transaction first. We will pretend that a donation occurred on the 22nd of June, 2010. Unfortunately, the code to create dates in Scala and pass these to JDBC is particularly clunky. We first create a java.util.Date instance, which we must then convert to a java.sql.Date to use in our newly created transaction:

scala> import java.text.SimpleDateFormat
import java.text.SimpleDateFormat

scala> val date = new SimpleDateFormat("dd-MM-yyyy").parse("22-06-2010")
date: java.util.Date = Tue Jun 22 00:00:00 BST 2010

scala> val sqlDate = new java.sql.Date(date.getTime())
sqlDate: java.sql.Date = 2010-06-22

scala> val transaction = Transaction(
  None, "Obama, Barack", "Doe, John", "TX", None, 200, sqlDate
)
transaction: Transaction = Transaction(None,Obama, Barack,Doe, John,TX,None,200,2010-06-22)

Much of the interface provided by the TableQuery instance mirrors that of a mutable list. To insert a single row in the transaction table, we can use the += operator:

scala> db.withSession {
  implicit session => Tables.transactions += transaction
}
Int = 1

Under the hood, this will create a JDBC prepared statement and run this statement's executeUpdate method.

If you are committing many rows at a time, you should use Slick's bulk insert operator: ++=. This takes a List[Transaction] as input and inserts all the transactions in a single batch by taking advantage of JDBC's addBatch and executeBatch functionality.

Let's insert all the FEC transactions so that we have some data to play with when running queries in the next section. We can load an iterator of transactions for Ohio by calling the following:

scala> val transactions = FECData.loadOhio.transactions
transactions: Iterator[Transaction] = non-empty iterator

We can also load the transactions for the whole of United States:

scala> val transactions = FECData.loadAll.transactions
transactions: Iterator[Transaction] = non-empty iterator

To avoid materializing all the transactions in a single fell swoop—thus potentially exceeding our computer's available memory—we will take batches of transactions from the iterator and insert them:

scala> val batchSize = 100000
batchSize: Int = 100000

scala> val transactionBatches = transactions.grouped(batchSize)
transactionBatches: transactions.GroupedIterator[Transaction] = non-empty iterator

An iterator's grouped method splits the iterator into batches. It is useful to split a long collection or iterator into manageable batches that can be processed one after the other. This is important when integrating or processing large datasets.

All that we have to do now is iterate over our batches, inserting them into the database as we go:

scala> db.withSession { implicit session =>
  transactionBatches.foreach { 
    batch => Tables.transactions ++= batch.toList
  }
}

While this works, it is sometimes useful to see progress reports when doing long-running integration processes. As we have split the integration into batches, we know (to the nearest batch) how far into the integration we are. Let's print the progress information at the beginning of every batch:

scala> db.withSession { implicit session =>
  transactionBatches.zipWithIndex.foreach { 
    case (batch, batchNumber) =>
      println(s"Processing row ${batchNumber*batchSize}")
      Tables.transactions ++= batch.toList
  }
}
Processing row 0
Processing row 100000
...

We use the .zipWithIndex method to transform our iterator over batches into an iterator of (batch, current index) pairs. In a full-scale application, the progress information would probably be written to a log file rather than to the screen.

Slick's well-designed interface makes inserting data very intuitive, integrating well with native Scala types.

Querying data

In the previous section, we used Slick to insert donation data into our database. Let's explore this data now.

When defining the Transactions class, we defined a TableQuery object, transactions, that acts as the handle for accessing the transaction table. It exposes an interface similar to Scala iterators. For instance, to see the first five elements in our database, we can call take(5):

scala> db.withSession { implicit session =>
  Tables.transactions.take(5).list
}
List[Tables.Transactions#TableElementType] = List(Transaction(Some(1),Obama, Barack,Doe, ...

Internally, Slick implements the .take method using a SQL LIMIT. We can, in fact, get the SQL statement using the .selectStatement method on the query:

scala> db.withSession { implicit session =>
  println(Tables.transactions.take(5).selectStatement)
}
select x2.`id`, x2.`candidate`, x2.`contributor`, x2.`contributor_state`, x2.`contributor_occupation`, x2.`amount`, x2.`date` from (select x3.`date` as `date`, x3.`contributor` as `contributor`, x3.`amount` as `amount`, x3.`id` as `id`, x3.`candidate` as `candidate`, x3.`contributor_state` as `contributor_state`, x3.`contributor_occupation` as `contributor_occupation` from `transactions` x3 limit 5) x2

Our Slick query is made up of the following two parts:

  • .take(n): This part is called the invoker. Invokers build up the SQL statement but do not actually fire it to the database. You can chain many invokers together to build complex SQL statements.
  • .list: This part sends the statement prepared by the invoker to the database and converts the result to Scala object. This takes a session argument, possibly implicitly.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.123.126