FS2 – functional streams

You may be wondering why we have a section about the FS2 library if we're not using it as a component for our application. Well, in fact, we are. It is an underlying building block for the database and HTTP libraries we're using, and therefore it is important to briefly discuss it to give you an understanding of how the other building blocks are connected. 

FS2 is a streaming library that allows us to construct and transform complex streams. The streams in FS2 do not only contain elements, but also can embody effects such as IO. This feature makes it possible to describe almost everything as an FS2 stream. Libraries such as http4s and doobie are built upon this and give a higher-level API to the user. But this API is still a streaming one.

The stream is represented as Stream[F,O], where F describes a possible effect of the stream and O is a type of its elements or output. This definition needs to be given two type parameters in order to fully specify it. If the stream has no effects, it will be pure: Stream[Pure, O].

Let's construct a stream of chars:

val chars: fs2.Stream[fs2.Pure,Char] = Stream.emits(List('a','b','c'))

Pure streams can be converted to the List or Vector without evaluation: chars.toList

Streams with effects can't be converted the same way because of the presence of effects. The effects first need to be reduced to a single effect. At the same time, we need to define how the output of the stream should be dealt with. Finally, we can execute the effect and get the output of the stream. This process is similar to the definition and materialization of the Akka streams we looked at in Chapter 13Basics of Akka Streams. Because we have quite a number of things to define, the syntax is a bit cumbersome, but it reflects the logic we described:

object Test extends App {
import fs2.Stream
import cats.effect.IO // 1
val io: IO[String] = IO { println("IO effect"); "a" * 2 } // 2
val as: Stream[IO, String] = Stream.eval(io) // 3
val c: Stream.ToEffect[IO, String] = as.compile // 4
val v: IO[Vector[String]] = c.toVector // 5
val l: IO[List[String]] = c.to[List] // 6
val d: IO[Unit] = c.drain // 7
val e: IO[Option[String]] = c.last // 8
println(v.unsafeRunSync()) // 9
println(e.unsafeRunSync()) // 10
Stream.eval(IO { 42 }).compile.toList.unsafeRunSync() // 11
}

Let's go over this snippet line by line and look at what is happening. The numbers in the code will correspond to the numbers in the following explanation:

  1. We are using the cats IO as type of our effect.
  2. We define an IO as a by-name parameter to write to the console and return aa.
  3. We eval our IO. This creates a single-element stream.
  4. By compiling the stream, we create its projection to a single effect.
  1. By converting a ToEffect projection to Vector it is compiled to the expected effect type. The process can be thought of as executing a stream of effects and logging emitted results into the desired structure.
  2. We demonstrate another way to define conversion to collection.
  3. drain is used to discard any emitted values and is useful if we are only interested in executing effects.
  4. There are also other possibilities to define what should happen with output elements of the stream, for example, just collecting the last one.
  5. unsafeRunSync() runs the definition, synchronously producing effects and emitting output. This is the first moment anything appears in the console because so far we've just created and modified the definition of the stream.
  6. The definition is immutable and can be shared. Because of this, we can run the same stream description multiple times (with respect to the kind effects).
  7. All of this is usually defined as a one-liner: eval the effect, compile the stream to the single effect, define the type of the output for the elements, run the stream later.

Now let's see how FS2 are utilized by http4s and doobie. We'll start with the database layer as its implementation will guide the structure of other layers.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.235.176