Graphs

The final piece of our flow is a Chef. It incorporates work management across Mixers. Let's implement Mixers first.

The mixing behavior itself is straightforward but to mimic real hardware we include a block for the time of mixing:

def mix(g: Groceries) = {
Thread.sleep(mixTime.toMillis)
import g._
Dough(eggs * 50 + flour + sugar + chocolate)
}

Because of the mixing behavior, we need to use a special async flow constructor to start a separate thread for every mixer. In order to better control how threads are assigned, we'll put into the configuration a definition of the separate pinned thread dispatcher which assigns one thread per sub-flow:

mixers-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}

With this definition in place, we are now able to define the blocking mixing behavior:

private def subMixFlow: Flow[Groceries, Dough, NotUsed] =
Flow[Groceries].async("mixers-dispatcher", 1).map(mix)

The async constructor takes a buffer size as a parameter and we want our mixers not to have any large buffers assigned to them.

The work management can be implemented as a separate concept which closely resembles one of the recipes from the Akka Streams documentation cookbook—the Balancer. It takes a worker subFlow and a count of workers and constructs a graph with the given number of workers:

import akka.stream.scaladsl.GraphDSL
import GraphDSL.Implicits._

def createGraph[Out, In](subFlow: Flow[In, Out, Any], count: Int) = {
val balanceBlock = Balance[In](count, waitForAllDownstreams = false)
val mergeBlock = Merge[Out](count, eagerComplete = false)
GraphDSL.create() { implicit builder
val balancer = builder.add(balanceBlock)
val merge = builder.add(mergeBlock)

for (_ ← 1 to count) balancer ~> subFlow ~> merge

FlowShape(balancer.in, merge.out)
}
}

The Balance block is a fan-out flow with several outputs. It distributes stream elements evenly between the workers. With waitForAllDownstreams = false we specify that the distribution can start as soon as at least one of the workers demands a job. With false we change the behavior to wait for all of the workers to demand a job before it will be distributed. The Merge is a fan-in block with a specified number of inputs. By specifying eagerComplete = false we tell it to wait for all down streams to complete as compared to completing as soon as one of the workers is done.

Then we construct a graph using GraphDSL.create() and provide actual graph building logic as a parameter. First, we convert balanceBlock and mergeBlock into Shapes by adding them to the builder. Then we connect as many sub-flows as needed to the balancer and merge using the ~> syntax provided by the import GraphDSL.Implicits._. The for comprehension for five workers would be equivalent to the following plain definition:

balancer ~> subFlow ~> merge
balancer ~> subFlow ~> merge
balancer ~> subFlow ~> merge
balancer ~> subFlow ~> merge

Having this graph defined, we can specify the rest of the Balancer flow using another Flow constructor:

def apply[In, Out](subFlow: Flow[In, Out, Any],
count: Int): Flow[In, Out, NotUsed] =
Flow.fromGraph(createGraph(subFlow, count))

We can use it to construct our Chef sub-flow:

def mixFlow: Flow[Groceries, Dough, NotUsed] =
Flow[Groceries]
.map(splitByMixer)
.flatMapConcat(mixInParallel)

def
splitByMixer(g: Groceries) = {
import g._
val single = Groceries(1, flour / eggs, sugar / eggs, chocolate / eggs)
List.fill(g.eggs)(single)
}

def mixInParallel(list: List[Groceries]) =
Source(list)
.via(Balancer(subMixFlow, list.size))
.grouped(list.size)
.map(_.reduce(_ + _))

Here, again we split Groceries into a stream of smaller portions, mix each of these portions using a dedicated mixer in parallel, and combine them using the same technique we used before with the Baker and Oven.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.132.99