
The final piece of our flow is a Chef. It incorporates work management across Mixers. Let's implement Mixers first.

The mixing behavior itself is straightforward but to mimic real hardware we include a block for the time of mixing:

def mix(g: Groceries) = {
import g._
Dough(eggs * 50 + flour + sugar + chocolate)

Because of the mixing behavior, we need to use a special async flow constructor to start a separate thread for every mixer. In order to better control how threads are assigned, we'll put into the configuration a definition of the separate pinned thread dispatcher which assigns one thread per sub-flow:

mixers-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher

With this definition in place, we are now able to define the blocking mixing behavior:

private def subMixFlow: Flow[Groceries, Dough, NotUsed] =
Flow[Groceries].async("mixers-dispatcher", 1).map(mix)

The async constructor takes a buffer size as a parameter and we want our mixers not to have any large buffers assigned to them.

The work management can be implemented as a separate concept which closely resembles one of the recipes from the Akka Streams documentation cookbook—the Balancer. It takes a worker subFlow and a count of workers and constructs a graph with the given number of workers:

import GraphDSL.Implicits._

def createGraph[Out, In](subFlow: Flow[In, Out, Any], count: Int) = {
val balanceBlock = Balance[In](count, waitForAllDownstreams = false)
val mergeBlock = Merge[Out](count, eagerComplete = false)
GraphDSL.create() { implicit builder
val balancer = builder.add(balanceBlock)
val merge = builder.add(mergeBlock)

for (_ ← 1 to count) balancer ~> subFlow ~> merge

FlowShape(, merge.out)

The Balance block is a fan-out flow with several outputs. It distributes stream elements evenly between the workers. With waitForAllDownstreams = false we specify that the distribution can start as soon as at least one of the workers demands a job. With false we change the behavior to wait for all of the workers to demand a job before it will be distributed. The Merge is a fan-in block with a specified number of inputs. By specifying eagerComplete = false we tell it to wait for all down streams to complete as compared to completing as soon as one of the workers is done.

Then we construct a graph using GraphDSL.create() and provide actual graph building logic as a parameter. First, we convert balanceBlock and mergeBlock into Shapes by adding them to the builder. Then we connect as many sub-flows as needed to the balancer and merge using the ~> syntax provided by the import GraphDSL.Implicits._. The for comprehension for five workers would be equivalent to the following plain definition:

balancer ~> subFlow ~> merge
balancer ~> subFlow ~> merge
balancer ~> subFlow ~> merge
balancer ~> subFlow ~> merge

Having this graph defined, we can specify the rest of the Balancer flow using another Flow constructor:

def apply[In, Out](subFlow: Flow[In, Out, Any],
count: Int): Flow[In, Out, NotUsed] =
Flow.fromGraph(createGraph(subFlow, count))

We can use it to construct our Chef sub-flow:

def mixFlow: Flow[Groceries, Dough, NotUsed] =

splitByMixer(g: Groceries) = {
import g._
val single = Groceries(1, flour / eggs, sugar / eggs, chocolate / eggs)

def mixInParallel(list: List[Groceries]) =
.via(Balancer(subMixFlow, list.size))
.map(_.reduce(_ + _))

Here, again we split Groceries into a stream of smaller portions, mix each of these portions using a dedicated mixer in parallel, and combine them using the same technique we used before with the Baker and Oven.

