The final piece of our flow is a Chef. It incorporates work management across Mixers. Let's implement Mixers first.
The mixing behavior itself is straightforward but to mimic real hardware we include a block for the time of mixing:
def mix(g: Groceries) = {
Thread.sleep(mixTime.toMillis)
import g._
Dough(eggs * 50 + flour + sugar + chocolate)
}
Because of the mixing behavior, we need to use a special async flow constructor to start a separate thread for every mixer. In order to better control how threads are assigned, we'll put into the configuration a definition of the separate pinned thread dispatcher which assigns one thread per sub-flow:
mixers-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
With this definition in place, we are now able to define the blocking mixing behavior:
private def subMixFlow: Flow[Groceries, Dough, NotUsed] =
Flow[Groceries].async("mixers-dispatcher", 1).map(mix)
The async constructor takes a buffer size as a parameter and we want our mixers not to have any large buffers assigned to them.
The work management can be implemented as a separate concept which closely resembles one of the recipes from the Akka Streams documentation cookbook—the Balancer. It takes a worker subFlow and a count of workers and constructs a graph with the given number of workers:
import akka.stream.scaladsl.GraphDSL
import GraphDSL.Implicits._
def createGraph[Out, In](subFlow: Flow[In, Out, Any], count: Int) = {
val balanceBlock = Balance[In](count, waitForAllDownstreams = false)
val mergeBlock = Merge[Out](count, eagerComplete = false)
GraphDSL.create() { implicit builder ⇒
val balancer = builder.add(balanceBlock)
val merge = builder.add(mergeBlock)
for (_ ← 1 to count) balancer ~> subFlow ~> merge
FlowShape(balancer.in, merge.out)
}
}
The Balance block is a fan-out flow with several outputs. It distributes stream elements evenly between the workers. With waitForAllDownstreams = false we specify that the distribution can start as soon as at least one of the workers demands a job. With false we change the behavior to wait for all of the workers to demand a job before it will be distributed. The Merge is a fan-in block with a specified number of inputs. By specifying eagerComplete = false we tell it to wait for all down streams to complete as compared to completing as soon as one of the workers is done.
Then we construct a graph using GraphDSL.create() and provide actual graph building logic as a parameter. First, we convert balanceBlock and mergeBlock into Shapes by adding them to the builder. Then we connect as many sub-flows as needed to the balancer and merge using the ~> syntax provided by the import GraphDSL.Implicits._. The for comprehension for five workers would be equivalent to the following plain definition:
balancer ~> subFlow ~> merge
balancer ~> subFlow ~> merge
balancer ~> subFlow ~> merge
balancer ~> subFlow ~> merge
Having this graph defined, we can specify the rest of the Balancer flow using another Flow constructor:
def apply[In, Out](subFlow: Flow[In, Out, Any],
count: Int): Flow[In, Out, NotUsed] =
Flow.fromGraph(createGraph(subFlow, count))
We can use it to construct our Chef sub-flow:
def mixFlow: Flow[Groceries, Dough, NotUsed] =
Flow[Groceries]
.map(splitByMixer)
.flatMapConcat(mixInParallel)
def splitByMixer(g: Groceries) = {
import g._
val single = Groceries(1, flour / eggs, sugar / eggs, chocolate / eggs)
List.fill(g.eggs)(single)
}
def mixInParallel(list: List[Groceries]) =
Source(list)
.via(Balancer(subMixFlow, list.size))
.grouped(list.size)
.map(_.reduce(_ + _))
Here, again we split Groceries into a stream of smaller portions, mix each of these portions using a dedicated mixer in parallel, and combine them using the same technique we used before with the Baker and Oven.