Enumerator

Similar to the Iteratee, an Enumerator is also defined through a trait and backed by an object of the same name:

trait Enumerator[E] {
  parent =>
  def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]
  ...
}
object Enumerator{
def apply[E](in: E*): Enumerator[E] = in.length match {
    case 0 => Enumerator.empty
    case 1 => new Enumerator[E] {
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC {
        case Step.Cont(k) => k(Input.El(in.head))
        case _ => i
      }
    }
    case _ => new Enumerator[E] {
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i)
    }
  }
...
}

Observe that the apply method of the trait and its companion object are different. The apply method of the trait accepts Iteratee[E, A] and returns Future[Iteratee[E, A]], while that of the companion object accepts a sequence of type E and returns an Enumerator[E].

Now, let's define a simple data flow using the companion object's apply method; first, get the character count in a given (Seq[String]) line:

  val line: String = "What we need is not the will to believe, but the wish to find out."
  val words: Seq[String] = line.split(" ")

  val src: Enumerator[String] = Enumerator(words: _*)

  val sink: Iteratee[String, Int] = Iteratee.fold[String, Int](0)((x, y) => x + y.length)
  val flow: Future[Iteratee[String, Int]] = src(sink)

  val result: Future[Int] = flow.flatMap(_.run)

The variable result has the Future[Int] type. We can now process this to get the actual count.

In the preceding code snippet, we got the result by following these steps:

  1. Building an Enumerator using the companion object's apply method:
    val src: Enumerator[String] = Enumerator(words: _*)
  2. Getting Future[Iteratee[String, Int]] by binding the Enumerator to an Iteratee:
    val flow: Future[Iteratee[String, Int]] = src(sink)
  3. Flattening Future[Iteratee[String,Int]] and processing it:
    val result: Future[Int] = flow.flatMap(_.run)
  4. Fetching the result from Future[Int]:

Thankfully, Play provides a shortcut method by merging steps 2 and 3 so that we don't have to repeat the same process every time. The method is represented by the |>>> symbol. Using the shortcut method, our code is reduced to this:

val src: Enumerator[String] = Enumerator(words: _*)
val sink: Iteratee[String, Int] = Iteratee.fold[String, Int](0)((x, y) => x + y.length)
val result: Future[Int] = src |>>> sink

Why use this when we can simply use the methods of the data type? In this case, do we use the length method of String to get the same value (by ignoring whitespaces)?

In this example, we are getting the data as a single String but this will not be the only scenario. We need ways to process continuous data, such as a file upload, or feed data from various networking sites, and so on.

For example, suppose our application receives heartbeats at a fixed interval from all the devices (such as cameras, thermometers, and so on) connected to it. We can simulate a data stream using the Enumerator.generateM method:

val dataStream: Enumerator[String] = Enumerator.generateM {
  Promise.timeout(Some("alive"), 100 millis)
}

In the preceding snippet, the "alive" String is produced every 100 milliseconds. The function passed to the generateM method is called whenever the Iteratee bound to the Enumerator is in the Cont state. This method is used internally to build enumerators and can come in handy when we want to analyze the processing for an expected data stream.

An Enumerator can be created from a file, InputStream, or OutputStream. Enumerators can be concatenated or interleaved. The Enumerator API is documented at https://www.playframework.com/documentation/2.3.x/api/scala/index.html#play.api.libs.iteratee.Enumerator$.

Using the Concurrent object

The Concurrent object is a helper that provides utilities for using Iteratees, enumerators, and Enumeratees concurrently. Two of its important methods are:

  • Unicast: It is useful when sending data to a single iterate.
  • Broadcast: It facilitates sending the same data to multiple Iteratees concurrently.

Unicast

For example, the character count example in the previous section can be implemented as follows:

  val unicastSrc = Concurrent.unicast[String](
    channel =>
      channel.push(line)
  )

  val unicastResult: Future[Int] = unicastSrc |>>> sink

The unicast method accepts the onStart, onError, and onComplete handlers. In the preceding code snippet, we have provided the onStart method, which is mandatory. The signature of unicast is this:

def unicast[E](onStart: (Channel[E]) ⇒ Unit,
    onComplete: ⇒ Unit = (),
    onError: (String, Input[E]) ⇒ Unit = (_: String, _: Input[E]) => ())(implicit ec: ExecutionContext): Enumerator[E] {…}

So, to add a log for errors, we can define the onError handler as follows:

val unicastSrc2 = Concurrent.unicast[String](
    channel => channel.push(line),
    onError = { (msg, str) => Logger.error(s"encountered $msg for $str")}
      )

Now, let's see how broadcast works.

Broadcast

The broadcast[E] method creates an enumerator and a channel and returns a (Enumerator[E], Channel[E]) tuple. The enumerator and channel thus obtained can be used to broadcast data to multiple Iteratees:

  val (broadcastSrc: Enumerator[String], channel: Concurrent.Channel[String]) = Concurrent.broadcast[String]  
  private val vowels: Seq[Char] = Seq('a', 'e', 'i', 'o', 'u')

  def getVowels(str: String): String = {
    val result = str.filter(c => vowels.contains(c))
    result
  }

  def getConsonants(str: String): String = {
    val result = str.filterNot(c => vowels.contains(c))
    result
  }

  val vowelCount: Iteratee[String, Int] = Iteratee.fold[String, Int](0)((x, y) => x + getVowels(y).length)

  val consonantCount: Iteratee[String, Int] = Iteratee.fold[String, Int](0)((x, y) => x + getConsonants(y).length)

  val vowelInfo: Future[Int]  = broadcastSrc |>>> vowelCount
  val consonantInfo: Future[Int]  = broadcastSrc |>>> consonantCount

  words.foreach(w => channel.push(w))
  channel.end()

  vowelInfo onSuccess { case count => println(s"vowels:$count")}
  consonantInfo onSuccess { case count => println(s"consonants:$count")}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.197.95