http4s – streaming HTTP

The implementation of the HTTP interface in our project is based on the http4s (https://http4s.org) library. http4s is built on top of the FS2 and Cats IO and therefore we have a nice interplay with the persistence layer implemented with doobie. With http4s, it is possible to build functional server-side services using high-level DSL, as well as use it on the client-side to call HTTP APIs. We will use the client functionality to build an integration test for our API later in this chapter.

The server side is represented by HttpService[F], which is essentially just a mapping from Request to F[Response] and F is a cats IO in our case. http4s DSL helps to construct such RESTful services by using pattern-matching.

This is how it looks in practice. First we need to have following imports for fs2 and IO, and http4s DSL and circe in scope:

import cats.effect.IO
import fs2.Stream
import org.http4s._
import org.http4s.dsl.Http4sDsl
import org.http4s.circe._
import org.http4s.headers.`Content-Type`
import io.circe.generic.auto._
import io.circe.syntax._

With these imports in place, we can start to build up our service definition:

class Service(repo: Repository) extends Http4sDsl[IO] { ... }

The service is given a database repository as a parameter. 

The routes are defined separately for each HTTP verb and a URL template. We start with the definition of the service method, which takes a partial function from request to response:

val service: HttpService[IO] = HttpService[IO] {

Then we follow with the simple route for article deletion:

case DELETE -> Root / "articles" / name if name.nonEmpty =>
val repoResult: IO[Boolean] = repo.deleteArticle(name)
val toResponse: Boolean => IO[Response[IO]] = if (_) NoContent() else NotFound()
val response: IO[Response[IO]] = repoResult.flatMap(toResponse)
response

Here we are using http4s DSL in order to deconstruct Request into parts and pattern-match against these parts. The -> object extracts the path from the request and the / class allows us to represent the concatenation of subpaths of the request URL (there is also /:, which matches the URL from the point of application and to the end of the url). The pattern-match itself is just a normal Scala case, hence we can use its full power. In this case, we're mapping the last part of the URL to name and have a guardian to make sure the path only matches if name is not empty (because we don't want to have anonymous articles in our shop!).

The expected result of the function is the IO[Response[IO]] type. Luckily, the return type of the deleteArticle method of our repository is IO[Boolean], so we can just flatMap the returned boolean value into the response body inside of an IO. In this case, we don't want to respond with the body, but just inform the caller about the success of the operation, which is represented with the respective response codes: 204 No Content and 404 Not Found. http4s provides a nice constructors for this with a bit of a verbose type: IO[Response[IO]]. In our case, we define a function from Boolean to this type and use this function to flatMap the result of the repository call, which leaves us with IO[Response[IO]] as an end result, which is exactly the type expected to be returned.

Of course, all of this logic can be written in a succinct manner. Here is an example for the API call to create an article:

case POST -> Root / "articles" / name if name.nonEmpty =>
repo.createArticle(name).flatMap { if (_) NoContent() else Conflict() }

 The approach is absolutely the same as the one we had for article deletion.

The API we're building is not a principle RESTful API. For this example to be a valid, level two API, we need to also implement a GET call that retrieves a representation for the individual articles. This can be done by adding a corresponding method to the repository and a case to the service. The implementation is left to the reader as an exercise.

Now that we have created a few articles in the repository, we would like to be able to retrieve the current state of it. We can implement it as follows:

case GET -> Root / "inventory" =>
val inventory: Stream[IO, Inventory] = repo.getInventory
renderInventory(inventory)

The above pattern-match is straightforward and so is the call to the getInventory method of the repository. But it returns the result of the Stream[IO, Inventory] type and we need to convert it to the matching type for HttpService[IO]. http4s has a concept of EntityEncoder for this.

Here is the corresponding implementation:

private def renderInventory(inventory: Stream[IO, Inventory]): IO[Response[IO]] = {
val json: Stream[IO, String] = inventory.map(_.asJson.noSpaces)
val response: IO[Response[IO]] =
Ok(json, `Content-Type`(MediaType.`application/json`))
response
}

Here, we prepare the inventory to be represented as an HTTP response by converting the returned Map[String, Int] to JSON. We rely on circe (https://github.com/circe/circe) to perform automatic conversion. Next, the stream is converted to the appropriate response type by the Ok status constructor and an implicit EntityEncoder[IO, String]. We explicitly force the content type of the response to be application/json in order to have it correctly represented in the response.

Finally, we want to provide a way to modify the state of the inventory like we did with the repository. We'll implement two API calls, one for replenishing the inventory and another for purchases. They are implemented similarly, so we'll cover only one of them; the other can be found in the GitHub repository. Here is the implementation for the restock call:

case req @ POST -> Root / "restock" =>
val newState = for {
purchase <- Stream.eval(req.decodeJson[Restock])
_ <- repo.updateStock(purchase.inventory)
inventory <- repo.getInventory
} yield inventory
renderInventory(newState)

We need a request to read its body, therefore we bind it to the req variable in the pattern match. Next, we decode the JSON body of the request and map it to our model. Here we rely on circe to do the heavy lifting again. The updateStock repository method returns the stream, so we need to bring our parameter in the same context in order to be able to use it nicely in the for comprehension. We're doing this by wrapping the result of the decoding into Stream.eval.

Then we call the repository and provide the required changes in the form of Inventory. This method returns Stream[IO, Either[Throwable, Unit]], so we ignore the result (it will shortcut the for comprehension in the case of an error). Finally, we read the new state of the repository and render it for the caller as before. 

The read-after-write is a known database anti-pattern. We used this approach to illustrate how streaming calls can be nicely chained in a for comprehension. In a real project, it might be better to formulate SQL statements in a way that the new state is returned immediately after the update.

The service layer is implemented now. We can wire our application together and see how it works.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.71.94