Message Broker API

When we implemented persistence in Chef we skipped the definition of resultsTopic we provided in the API definition. Let's take a look at the definition of resultsTopic now:

class ChefServiceImpl(...) extends ChefService {

...

override def resultsTopic: Topic[Dough] =
TopicProducer.singleStreamWithOffset { fromOffset =>
persistentEntities
.eventStream(ChefModel.EventTag, fromOffset)
.map { ev => (convertEvent(ev), ev.offset) }
}

private def convertEvent(chefEvent: EventStreamElement[ChefEvent]): Dough = {
chefEvent.event match {
case MixingDone(_, dough) => dough
}
}
}

We're using the singleStreamWithOffset constructor of the TopicProducer factory to construct a topic to which all of the events marked with ChefModel.EventTag will get published. Before publishing happens, we convert ChefEvent into Dough as expected by the downstream services. This is done in the convertEvent method.

The receiving side is Manager. Lagom provides all of the infrastructure so that the consumption of the events boils down to the following one-liner:

val sub: Future[Done] = chefService.resultsTopic.subscribe.atLeastOnce(cookChefFlow)

Here, we're using chefService resultsTopic to subscribe to the events. We provide cookChefFlow as a callback which will be called at least once for each of the published events. The atLeastOnce method expects akka.stream.scaladsl.Flow[Payload, Done, _] as a parameter with Payload referring to the type of the message. We'll define our flow of the Flow[Dough, Done, _] type in a moment.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.58.51.36