Handling streams reactively

Whenever you need to consume and transform streams of data in web applications, such as watching stock updates or monitoring log activities on a service, you need mechanisms to manipulate chunks of data that can be pushed from a server to a browser, for instance, using Comet (http://en.wikipedia.org/wiki/Comet_(programming)) or WebSocket (http://en.wikipedia.org/wiki/WebSocket) technologies. The Iteratee pattern available within the Play framework is such a mechanism. It was borrowed from the Haskell functional language initially.

Understanding Iteratees in Play

An Iteratee construct aims at providing a composable and nonblocking way of handling streams produced by its counterpart called Enumerator.

Let's launch a Scala REPL to explore the Iteratee/Enumerator constructs in more detail. To create a new play project as we have done several times before, notably in Chapter 5, Getting Started with the Play Framework, use the following command:

> play new ch9samples (then choose Scala as language)
> cd ch9samples
> play console

First, we will remind ourselves how an iteration is done within an imperative language such as Java. The following statements written in Scala describe the use of a mutable variable total that will be updated at each step of the iteration:

scala> val numbers = List(1,4,7,8,10,20)
numbers: List[Int] = List(1, 4, 7, 8, 10, 20)
scala> var total = 0
total: Int = 0
scala> var iterator = numbers.iterator
iterator: Iterator[Int] = non-empty iterator
scala> while (iterator.hasNext) {
           total += iterator.next
       }
scala> total
res2: Int = 50

Tip

As explained in the blog post available at http://mandubian.com/2012/08/27/understanding-play2-iteratees-for-normal-humans/, we need to take care of the following when iterating:

  • The state of the iteration (are there more elements to follow or is it finished)?
  • A context (here the total accumulator)
  • An action, updating the context, that is, the total += iterator.next

We have seen in Chapter 1, Programming Interactively within Your Project, that we can implement the same operation in a concise and more functional way by using the foldLeft Scala construct in the following way:

scala> List(1,4,7,8,10,20).foldLeft(0){ (total,elem) =>
       total + elem } 
res3: Int = 50

The foldLeft construct is a powerful construct that is applied to Scala collections such as Lists. If we want to process other forms of input such as a file, a network, a database connection, or a flow produced by an Akka actor for instance, then the Enumerator/Iteratee comes into play. An Enumerator construct can be seen as the producer of data (similar to the previous List ) and an Iteratee as the consumer of that data, processing each step of the iteration. The preceding example involving the foldLeft method on a List could just be rewritten using an Enumerator/Iteratee construct. As the iteratee library is already available within Play, it can be imported directly by using the following command:

scala> import play.api.libs.iteratee._
import play.api.libs.iteratee._
scala> import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.concurrent.Execution.Implicits._

After importing the iteratee library and a global execution context for the iteratee variables to work with, we can define our first Enumerator as follows:

scala> val enumerator = Enumerator(1,4,7,8,10,20)
enumerator: play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@27a21c85...

The iteratee variable defined as follows indicates the computation step to be performed while accepting an input from the enumerator:

scala> val iteratee = Iteratee.fold(0){ (total, elem:Int) => total + elem }
iteratee: play.api.libs.iteratee.Iteratee[Int,Int] = play.api.libs.iteratee.ContIteratee@e07a406

Combining the enumerator construct with the iteratee construct is a matter of invoking the run method of enumerator that takes the iteratee as an argument:

scala> val result = enumerator.run(iteratee)
result: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@78b5282b

As we have an asynchronous computation, we get back a result as a Future that we can display once it is completed, as follows:

scala> result onComplete println
scala> Success(50)

The enumerator object mentioned previously was an enumerator of integers. We can create producers of data of many different types, such as strings or double values. This is shown in the following code:

scala> val stringEnumerator = Enumerator("one","two","four")
stringEnumerator: play.api.libs.iteratee.Enumerator[String] = play.api.libs.iteratee.Enumerator$$anon$19@1ca7d367
scala> val doubleEnumerator = Enumerator(1.03,2.34,4)
doubleEnumerator: play.api.libs.iteratee.Enumerator[Double] = play.api.libs.iteratee.Enumerator$$anon$19@a8e29a5

To illustrate the creation of an Enumerator from a file, let's add a little text file named samplefile.txt in the root of the current project containing, for instance, the following three lines of text:

Alice
Bob
Charlie

You may use a separate console window to create this file while leaving the REPL running in the original console window. Otherwise, you will have to rerun the import statements. Creating an Enumerator from a file is shown in the following commands:

scala> import java.io.File
import java.io.File
scala> val fileEnumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(new File("./samplefile.txt"))
fileEnumerator: play.api.libs.iteratee.Enumerator[Array[Byte]] = play.api.libs.iteratee.Enumerator$$anon$4@33500f2

Enumerator even comprises some useful methods. For example, a stream of events that are generated at regular intervals each time the Promise object, which contains the current time, times out (every 500 milliseconds).

scala> val dateGenerator: Enumerator[String] = Enumerator.generateM(
          play.api.libs.concurrent.Promise.timeout(
    Some("current time %s".format((new java.util.Date()))),
    500
    )) 

In a more general way, we can say that Enumerator[E] (read enumerator of type E) produces three possible kinds of chunks of data of type E:

  • Input[E]: It is a chunk of data of type E, for example, Input[LogData] is a chunk of LogData
  • Input.Empty: It means that the enumerator is empty, for instance, an Enumerator streaming an empty file
  • Input.EOF: It means that the enumerator has reached its end, for instance, an Enumerator construct streaming a file and reaching the end of the file

In addition to the run method used to run an Enumerator over an Iteratee, you can also invoke the constructor, that is, the apply method of the enumerator directly. Notice in the following two commands, the different result types you get depending on how you combine enumerator/iteratee:

scala> val result = enumerator.run(iteratee)
result: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@1837220f
scala> val result2=enumerator(iteratee)
result2: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Int]] = scala.concurrent.impl.Promise$DefaultPromise@5261b67f

This last Future result contains an Iteratee[Int,Int], that is, an Iteratee[<type contained in chunk>, <result of the iteration>].

scala> val enumerator = Enumerator(1,4,7,8,10,20)
enumerator: play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@7e666ce4

The following Iteratee consumes all the chunks from the enumerator stream and returns them as a List collection:

scala> val chunksIteratee = Iteratee.getChunks[Int]
chunksIteratee: play.api.libs.iteratee.Iteratee[Int,List[Int]] = play.api.libs.iteratee.ContIteratee@53af8d86
scala> val list = enumerator.run(chunksIteratee)
list: scala.concurrent.Future[List[Int]] = scala.concurrent.impl.Promise$DefaultPromise@66e1b41c
scala> list onComplete println
scala> Success(List(1, 4, 7, 8, 10, 20))

The examples of Iteratee that we have seen so far use the method fold pretty much like the foldLeft and the foldRight methods that are part of the Scala collection. Let's try to build a more sophisticated Iteratee: one that, for instance, selects words containing the letter E out of the enumerator streams. This can be done using the following code:

scala> def wordsWithE: Iteratee[String,List[String]] = {
  def step(total:List[String])(input:Input[String]): Iteratee[String,List[String]] = input match {
    case Input.EOF | Input.Empty => Done(total,Input.EOF)
    case Input.El(elem) =>
      if(elem.contains("E")) Cont[String,List[String]](i=> step(elem::total)(i))
      else Cont[String,List[String]](i=> step(total)(i))
    }
  Cont[String,List[String]](i=> step(List[String]())(i))
}
wordsWithE: play.api.libs.iteratee.Iteratee[String,List[String]]

The step recursive function is using a total accumulator variable, that is, a context to keep some state at each step of the recursion. This is a list of strings containing all the results we are interested in. The second argument to the step function is the new chunk from the enumerator stream that comes up at each step. This chunk is matched against the possible states; if either the stream is empty or we have reached its end, we return the accumulated result in a Done state. Otherwise, we handle the incoming element. If the element verifies the if condition, then we add it to the accumulator and invoke the next step in our recursion as part of a Cont (continue) state. Otherwise, we just invoke the next step without saving the element.

Finally, the last step initiates the recursion by calling the step function on the first element of the stream with an empty accumulator. Applying this newly-defined Iteratee on a simple enumerator looks like the following command:

scala> val output = Enumerator("ONE","TWO","THREE") run wordsWithE
output: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@50e0cc83
scala> output onComplete println
scala> Success(List(THREE, ONE))

Every computation step performed on an incoming string either appends that string to the total accumulator or ignores it, depending on whether it matches the if condition or not. In this example, it simply checks that the word contains at least one E.

Adapting Enumerator with Enumeratee

It might happen that the data consumed by an Iteratee does not match the input produced by an Enumerator. The role of an Enumeratee is to be an adapter that sits in between the Enumerator and Iteratee to transform the incoming data before feeding the Iteratee.

As an example of simple transformation from an Enumerator to another one, we could ,for instance, define an Enumeratee that converts an input of the type String to Int, as illustrated by the following commands:

scala> val summingIteratee = Iteratee.fold(0){ (total, elem:Int) => total + elem }
summingIteratee: play.api.libs.iteratee.Iteratee[Int,Int] = play.api.libs.iteratee.ContIteratee@196fad1a
scala> Enumerator("2","5","7") through Enumeratee.map(x => x.toInt) run summingIteratee
res5: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@5ec418a8
scala> res5 onComplete println
scala> Success(14)

The transformation provided by the Enumeratee can be declared in its map method.

Adapting the Enumerator can also consist of transforming the input data to a different format without changing the type. Considering wordsWithE that we defined previously, we could apply an Enumeratee that converts all the input data to uppercase so that the consumption of the stream of data by the Iteratee would produce a different result than the one obtained without Enumeratee. The following code illustrates that behavior:

scala> val enumerator = Enumerator("ONE","Two","Three")
scala> enumerator run wordsWithE onComplete println
scala> Success(List(ONE))
scala> enumerator through Enumeratee.map(x=>x.toUpperCase) run wordsWithE onComplete println
scala> Success(List(THREE, ONE))

To summarize, an Enumerator is a producer of a data stream, an Iteratee a consumer of that data, and an Enumeratee an adapter between the two. The iteratee pattern has been integrated together with the Play Framework as a way to handle streams of data reactively in a web application. In the next section, we are going to build web applications in such a way, by additionally using WebSockets to communicate between the client and the server in both directions.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.227.9