Flows 

Now that we have the source and the sink, let's implement the flow itself. Again, we will start with the simplest parts and switch to the more complex as we progress.

The easiest flow building block is surely the Cook. It could be implemented as a map function called on the preceding flow definition but for composing reasons, we'd like to define it separately.

The approach of the flow definition is consistent with the previous two—the Flow constructor is the way to go. The flow is defined in terms of operations on the input but the definition itself is decoupled from this input. Again there are lots of methods to choose from; for our purposes, we pick the simple map

object Cook {
def formFlow: Flow[Dough, RawCookies, NotUsed] =
Flow[Dough].map { dough =>
print(s"Forming $dough - ")
val result = RawCookies(makeCookies(dough.weight))
println(result)
result
}
private val cookieWeight = 50
private def makeCookies(weight: Int): Int = weight / cookieWeight
}

The cook's flow is just mapping over the input dough and converting it to the output, raw cookies, as it represented by the type annotation.

The Boy is quite similar to the Cook in the sense that it is a simple building block which transforms its input into the output. There is one caveat though—our Boy needs to communicate with the remote actor in order to do this. 

Akka Streams is built upon Akka and thus offers some possibilities to utilize and communicate with actors at different stages; for instance, it is possible to use an ActorRef as a source or sink. The remoting aspect in this situation turns out to be just an implementation and configuration detail because of Akka's location transparency.

In our use case, the most appropriate way to communicate with a Seller deployed in the remote shop system will be an ask pattern. Let's do this step by step. First, we'll look up a remote actor in order to be able to communicate with it:

def lookupSeller(implicit as: ActorSystem): Future[ActorRef] = {
val store = "akka.tcp://[email protected]:2553"
val seller = as.actorSelection(s"$store/user/Seller")
seller.resolveOne()
}

Given an ActorSystem, we look up an actor using an address of the remote system and an actor path. We know there should be exactly one actor, therefore we resolve one reference. Depending on the result of the lookup it will return either Success with the reference we need or a Failure[ActorNotFound]. The failure will be propagated via the error flow and will lead to the termination of the stream because we don't define how to handle it. Let's call this the desired behavior because without a seller we won't be able to convert a shopping list into groceries.

We can use a Future[ActorRef] to talk to the actor: 

def goShopping(implicit as: ActorSystem, ec: ExecutionContext):
Future[Flow[ShoppingList, Groceries, NotUsed]] =
lookupSeller.map { ref =>
Flow[ShoppingList].ask[Groceries](ref)
}

Here, we not only need an ActorSystem but also an ExecutionContext in order to be able to map over the Future we acquire from the lookupSeller. We're using the actor reference (if there is one) as a parameter to call Flow.ask.  The type of the Flow corresponds to the expected input type and the type of the ask—to the expected output type.

Now we can use another Flow constructor to convert a Future[Flow] to the Flow

def shopFlow(implicit as: ActorSystem, ec: ExecutionContext): Flow[ShoppingList, Groceries, Future[Option[NotUsed]]] =
Flow.lazyInitAsync { () => goShopping }

The lazyInitAsync translates an internal Flow of the Future into the normal Flow. This sub-flow has a proper type of input and output and thus we can plug it into our flow definition later.

It is important to extend the configuration in the application.conf with properties, needed for the Akka remoting as described in Chapter 11, An Introduction to the Akka and Actor Models

The next composite step we're going to implement is a Baker, including its constituent Oven.

The Oven needs to spend some time turning raw cookies into edible cookies and we could implement this by introducing a bit of blocking behavior. But doing so will affect the rest of the system by needlessly consuming available threads. Because of this, we'll use another feature of Akka Streams, Flow.delay, which allows us to shift the emission of elements in time:

def bakeFlow: Flow[RawCookies, ReadyCookies, NotUsed] =
Flow[RawCookies]
.delay(bakingTime, DelayOverflowStrategy.backpressure)
.addAttributes(Attributes.inputBuffer(1, 1))
.map(bake)

As we only have one Oven, we define a buffer size to be of the initial and maximum size of 1. We also don't want to drop arriving raw cookies or release cookies which are not ready yet, therefore we define an overflow strategy to be a back pressure.

The bake method is a trivial conversion once again:

private def bake(c: RawCookies): ReadyCookies = {
assert(c.count == ovenSize)
ReadyCookies(c.count)
}

Now, with this Oven we can define a Baker which we planned to give a type of BidiFlow:

def bakeFlow = BidiFlow.fromFlows(inFlow, outFlow)

In order to do this, we need to separately define the inFlow and outFlow for both flow directions.

The outFlow is just passing cookies that are ready to the consumer and we already know how to do that:

private def outFlow = Flow[ReadyCookies]

The inFlow is a bit more involving because we need to regroup incoming raw cookies from groups of some random quantity to groups with the size of the oven. We'll do this by defining a sub-source of single cookies and then grouping them as desired. Here is the first step:

def extractFromBox(c: RawCookies) = Source(List.fill(c.count)(RawCookies(1)))

We're creating a source: the number of single cookies. The regrouping logic looks like this:

val inFlow = Flow[RawCookies]
.flatMapConcat(extractFromBox)
.grouped(Oven.ovenSize)
.map(_.reduce(_ + _))

The flatMapConcat consumes one source after another and concatenates the results. We then group the stream of single cookies to the stream of List[RawCookie] of ovenSize. Lastly, we reduce this list of single cookies into RawCookie(ovenSize) as Oven expects it.

Now we can combine a baker's BidiFlow and oven's Flow into the composite Flow by joining them:

val bakerFlow: Flow[RawCookies, ReadyCookies, NotUsed] = 
Baker.bakeFlow.join(Oven.bakeFlow)

The join method adds a given Flow as a final transformation to the stack of BidiFlows. In our case, the size of the stack is one and the type of the resulting flow is Flow[RawCookies, ReadyCookies, NotUsed]. The resulting sub-flow hides all of the details of regrouping the cookies and waiting for their readiness, leaving us with a nice definition.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.220.237.24