Doobie – functional database access

The database layer in our example is implemented with the Doobie library. Its official website describes it as a pure functional JDBC layer for Scala and Cats. It allows us to abstract existing JDBC functionality in a nice functional way. Let's show how this is done. The library can be found at https://tpolecat.github.io/doobie/. In the following examples, please assume the following imports to be in scope: 

import cats.effect.IO
import fs2.Stream
import doobie._
import doobie.implicits._
import doobie.util.transactor.Transactor
import cats.implicits._

We also need some model classes to persist, and for the purpose of the example, we'll keep the ADT as small as possible:

object Model {
type Inventory = Map[String, Int]
abstract sealed class Operation(val inventory: Inventory)

final case class Purchase(order: Inventory)
extends Operation(order.mapValues(_ * -1))

final case class Restock(override val inventory: Inventory)
extends Operation(inventory)
}

This model will allow us to represent the inventory of our shop as a map with keys referring to the article name and a values denoting number of the respective items in stock. We'll also have two operations that can be applied to the inventory—the Purchase operation will reduce the number of corresponding items and the Restock operation will increase number of respective items by combining our existing stocks together.

Now we can define our repository for this model. We'll do this in the same pure functional way we did before:

class Repository(transactor: Transactor[IO]) { ... }

The repository is given Transactor[IO] as a constructor parameter. The IO in this example is cats.effect.IO. The transactor knows how to work with database connections. It can manage connections in the same logical way a connection pool does. In our implementation, Transactor is used to convert an FS2 Stream[IO, ?] into the IO, which will connect to the database and execute SQL statements if run. Let's see in detail how this is done for article-creation:

def createArticle(name: String): IO[Boolean] = {
val sql: Fragment = sql"INSERT INTO article (name, count) VALUES ($name, 0)" // 1
val update: Update0 = sql.update // 2
val conn: ConnectionIO[Int] = update.run //3
val att: ConnectionIO[Either[Throwable, Int]] = conn.attempt //4
val transact: IO[Either[Throwable, Int]] = att.transact(transactor) // 5
transact.map { // 6
case Right(affectedRows) => affectedRows == 1
case Left(_) => false
}
}

Let's go over this definition line by line to see what is going on here:

  1. We define a Fragment, which is an SQL statement that can include interpolated values. Fragments can be combined together. 
  2. From Fragment, we construct an UpdateUpdate can be used to construct a ConnectionIO later.
  3. We construct a ConnectionIO by calling the run method on  updateConnectionIO is basically an abstraction over the possible operations available on the JDBC connection. 
  4. By calling an attempt method, we're adding error-handling to our ConnectionIO. This is the reason the type parameter of ConnectionIO has changed from Int to Either[Throwable, Int]
  5. By providing a transactor to the transact method, we convert ConnectionIO into IO, which represents a runnable doobie program.
  6. We coerce different sides of Either to a single Boolean value. We expect the number of created rows to be exactly one, in which case the call was a success. If we failed to create a row or if there was an exception thrown, it is a failure.
It would be more appropriate in the erroneous case to differentiate between the unique index or primary key violation and other cases but unfortunately different database drivers have different encoding for that, so it is not possible to provide concise generic implementation.

Other methods in our repository will follow the same pattern. deleteArticle is a one-liner and we don't bother to handle errors in this case (exceptions will bubble up to the upper layers and be propagated to the client if they will be thrown), so we can just check whether the number of affected rows was exactly one:

def deleteArticle(name: String): IO[Boolean] =
sql"DELETE FROM article WHERE name = $name"
.update.run.transact(transactor).map { _ == 1 }

getInventory is a bit different because it needs to return the results of the query:

def getInventory: Stream[IO, Inventory] = {
val query: doobie.Query0[(String, Int)] =
sql"SELECT name, count FROM article".query[(String, Int)]
val stream: Stream[IO, (String, Int)] =
query.stream.transact(transactor)
stream.fold(Map.empty[String, Int])(_ + _)
}

Here, we see that the query is of the doobie.Query0[(String, Int)] type with the type parameter representing the column types of the result. We convert the query to Stream[ConnectionIO, (String, Int)] (an FS2 stream with the ConnectionIO effect type and the tuple as a type of elements) by calling a stream method and then convert ConnectionIO to IO by providing a transactor. At last, we fold elements of the stream into Map, thus constructing the inventory state at the present moment from individual rows.

Updating the inventory has another caveat. We would like to update multiple articles at once so that if there is insufficient supply for some of the articles, we discard the whole purchase.

This is a design decision. We could decide to return a partially-fulfilled order to the client.

The count of every article needs to be updated separately, therefore we need to have multiple update statements running in a single transaction. This is how it is done:

def updateStock(inventory: Inventory): Stream[IO, Either[Throwable, Unit]] = {
val updates = inventory.map { case (name, count) =>
sql"UPDATE article SET count = count + $count WHERE name = $name".update.run
}.reduce(_ *> _)
Stream
.eval(
FC.setAutoCommit(false) *> updates *> FC.setAutoCommit(true)
)
.attempt.transact(transactor)
}

We're given a map of name -> count pairs as a parameter. The first thing we do is to convert each of these pairs into an update operation by mapping over them. This leaves us with a collection of CollectionIO[Int]. We then combine these updates together by using the cats Apply operator, which produces a single CollectionIO[Int].

JDBC by default has auto-commit enabled, which will lead to the effect that our updates in the batch will be executed and committed one by one. This can lead to partially-fulfilled orders. In order to avoid that, we wrap the updates into the stream, which will disable auto-commits before the updates and enable auto-commits again after all of them are executed. We then lift the error-handling of the result and convert it into the runnable IO as before. 

The result of the method is the Stream[IO, Either[Throwable, Unit]] type. The type of the elements of the stream encodes the possibilities to have both updates that weren't possible because there were insufficient articles in the inventory as Left and a successful update as Right .

With these four methods, we actually have all the required basic functionality and can start to use it in the API layer. 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.254.151