Similar to the Iteratee, an Enumerator is also defined through a trait and backed by an object of the same name:
trait Enumerator[E] { parent => def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] ... } object Enumerator{ def apply[E](in: E*): Enumerator[E] = in.length match { case 0 => Enumerator.empty case 1 => new Enumerator[E] { def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC { case Step.Cont(k) => k(Input.El(in.head)) case _ => i } } case _ => new Enumerator[E] { def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i) } } ... }
Observe that the apply
method of the trait and its companion object are different. The apply
method of the trait accepts Iteratee[E, A]
and returns Future[Iteratee[E, A]]
, while that of the companion object accepts a sequence of type E
and returns an Enumerator[E]
.
Now, let's define a simple data flow using the companion object's apply
method; first, get the character count in a given (Seq[String])
line:
val line: String = "What we need is not the will to believe, but the wish to find out." val words: Seq[String] = line.split(" ") val src: Enumerator[String] = Enumerator(words: _*) val sink: Iteratee[String, Int] = Iteratee.fold[String, Int](0)((x, y) => x + y.length) val flow: Future[Iteratee[String, Int]] = src(sink) val result: Future[Int] = flow.flatMap(_.run)
The variable result has the Future[Int]
type. We can now process this to get the actual count.
In the preceding code snippet, we got the result by following these steps:
apply
method:val src: Enumerator[String] = Enumerator(words: _*)
Future[Iteratee[String, Int]]
by binding the Enumerator to an Iteratee:val flow: Future[Iteratee[String, Int]] = src(sink)
Future[Iteratee[String,Int]]
and processing it:val result: Future[Int] = flow.flatMap(_.run)
Future[Int]
:Thankfully, Play provides a shortcut method by merging steps 2 and 3 so that we don't have to repeat the same process every time. The method is represented by the |>>>
symbol. Using the shortcut method, our code is reduced to this:
val src: Enumerator[String] = Enumerator(words: _*) val sink: Iteratee[String, Int] = Iteratee.fold[String, Int](0)((x, y) => x + y.length) val result: Future[Int] = src |>>> sink
Why use this when we can simply use the methods of the data type? In this case, do we use the length
method of String
to get the same value (by ignoring whitespaces)?
In this example, we are getting the data as a single String
but this will not be the only scenario. We need ways to process continuous data, such as a file upload, or feed data from various networking sites, and so on.
For example, suppose our application receives heartbeats at a fixed interval from all the devices (such as cameras, thermometers, and so on) connected to it. We can simulate a data stream using the Enumerator.generateM
method:
val dataStream: Enumerator[String] = Enumerator.generateM { Promise.timeout(Some("alive"), 100 millis) }
In the preceding snippet, the "alive"
String is produced every 100 milliseconds. The function passed to the generateM
method is called whenever the Iteratee bound to the Enumerator is in the Cont
state. This method is used internally to build enumerators and can come in handy when we want to analyze the processing for an expected data stream.
An Enumerator can be created from a file, InputStream
, or OutputStream
. Enumerators can be concatenated or interleaved. The Enumerator API is documented at https://www.playframework.com/documentation/2.3.x/api/scala/index.html#play.api.libs.iteratee.Enumerator$.
The Concurrent
object is a helper that provides utilities for using Iteratees, enumerators, and Enumeratees concurrently. Two of its important methods are:
For example, the character count example in the previous section can be implemented as follows:
val unicastSrc = Concurrent.unicast[String]( channel => channel.push(line) ) val unicastResult: Future[Int] = unicastSrc |>>> sink
The unicast
method accepts the onStart
, onError
, and onComplete
handlers. In the preceding code snippet, we have provided the onStart
method, which is mandatory. The signature of unicast is this:
def unicast[E](onStart: (Channel[E]) ⇒ Unit, onComplete: ⇒ Unit = (), onError: (String, Input[E]) ⇒ Unit = (_: String, _: Input[E]) => ())(implicit ec: ExecutionContext): Enumerator[E] {…}
So, to add a log for errors, we can define the onError
handler as follows:
val unicastSrc2 = Concurrent.unicast[String]( channel => channel.push(line), onError = { (msg, str) => Logger.error(s"encountered $msg for $str")} )
Now, let's see how broadcast works.
The broadcast[E]
method creates an enumerator and a channel and returns a (Enumerator[E], Channel[E])
tuple. The enumerator and channel thus obtained can be used to broadcast data to multiple Iteratees:
val (broadcastSrc: Enumerator[String], channel: Concurrent.Channel[String]) = Concurrent.broadcast[String] private val vowels: Seq[Char] = Seq('a', 'e', 'i', 'o', 'u') def getVowels(str: String): String = { val result = str.filter(c => vowels.contains(c)) result } def getConsonants(str: String): String = { val result = str.filterNot(c => vowels.contains(c)) result } val vowelCount: Iteratee[String, Int] = Iteratee.fold[String, Int](0)((x, y) => x + getVowels(y).length) val consonantCount: Iteratee[String, Int] = Iteratee.fold[String, Int](0)((x, y) => x + getConsonants(y).length) val vowelInfo: Future[Int] = broadcastSrc |>>> vowelCount val consonantInfo: Future[Int] = broadcastSrc |>>> consonantCount words.foreach(w => channel.push(w)) channel.end() vowelInfo onSuccess { case count => println(s"vowels:$count")} consonantInfo onSuccess { case count => println(s"consonants:$count")}
18.191.168.8