Sources and sinks

The simplest component of our flow is possibly the Consumer. It is just a sink which is supposed to print out information about incoming data. To construct it, we'll use the Sink factory, as follows:

val consumer: Sink[ReadyCookies, Future[Done]] =
Sink.foreach(cookie => println(s"$cookie, yummi..."))

 The Sink factory offers more than two dozens different constructors to define a sink. We're utilizing one of the simplest, which invokes a provided function for each element of the stream.

Here we see that the real type of it is the Sink[ReadyCookies, Future[Done]]. This reflects the type ReadyCookies elements and the type the Sink is materialized to. In this case, it is materialized into Success if stream ends by reaching its end and to the Failure if there is a failure in the stream.

Now we'll take a look at the opposite end of the stream and define a source.  The Source factory similarly provides almost three dozens of different methods to create a source. We don't want to overwhelm our bakery's team with work so we decided to use a timed source of data:

private val delay = 1 second
private val interval = 1 second

val manager1: Source[NotUsed, Cancellable] =
Source.tick(delay, interval, NotUsed)

This represents the first block in our composite Source and its type is no-fit for our Boy, so we need to implement the second block of the diagram, the generator, and connect both together. This is more easily done than explained:

val manager: Source[ShoppingList, Cancellable] =
Source.tick(delay, interval, NotUsed).map { _ =>
shoppingList
}

We basically just map over the input but ignore it and return a shoppingList instead. Now our Source has a proper type so that we can connect a Boy to it later.

There is a subtle aspect of this implementation which we didn't take into the account. We have a predefined interval with the intention that the rest of the flow is not overwhelmed with requests. But at the same time, we're about to rely on the back pressure from the Oven for the same purpose. This is not optimal because if we pick too big an interval, our bakery will be under-utilized and if we pick too small an interval, this will be the back pressure which will manage the flow. We can simplify our source to the form that will just produce shopping lists and put them into the pipeline as soon as there is some downstream capacity available:

val manager: Source[ShoppingList, NotUsed] =
Source.repeat(NotUsed).map(_ => shoppingList)

Here, we just repeat the NotUsed element (which provides a nice syntax) and then replace it with the random shopping list as before. The difference is that the manager will generate a shopping list every time there is a demand for that without potentially waiting too long because of the timer settings.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.99.71