Persistence API

Chef in our scenario takes some time to complete the mix process. Because of this, we would like to store the work in progress so that, in case the service is restarted, it does not get lost and the process just continues from where it was interrupted after recovery.

We will use persistence facilities provided by the framework in order to implement this. The recommended way to persist data in Lagom is by utilizing an event sourcing approach, which we already used to implement an example project in Chapter 14, Project 1 - Building Microservices with Scala. Lagom automates data schema creation with Cassandra and also provides a Cassandra instance for development purposes. Therefore, we can start directly with the definition of the data model. As in the previous chapter, we need to provide a set of commands and events and also have an internal representation of the state. The following few snippets show one of the possible ways to represent these parts. As this model is just an implementation detail of Chef, it goes into the chef-impl module.

First, we need to have a bunch of imports in scope:

import ch15.model._
import java.util.UUID
import
akka.Done
import com.lightbend.lagom.scaladsl.persistence._
import PersistentEntity.ReplyType
import com.lightbend.lagom.scaladsl.playjson.JsonSerializer
import play.api.libs.json._

Having these, we can define our commands:

sealed trait ChefCommand
final case class MixCommand(groceries: Groceries) extends ChefCommand with ReplyType[Done]
final case class DoneCommand(id: UUID) extends ChefCommand with ReplyType[Done]

MixCommand represents an incoming request to mix some groceries. Commands in Lagom define the expected response type and we are using Akka's Done for the response. The reason for this is that we'll always accept MixCommand (because there is no reason not to), but at the moment the command is accepted it is not possible to predict which effect it will have.

DoneCommand represents a state transition from "mixing in progress" to "mixing done". It will be an internal command sent by Chef to itself. Technically we don't need a response type here but we have to use Done again in order to make the compiler happy. id represents the unique identifier of the mixing job. Where does it come from? It is generated at the moment we're creating an event from the command:

sealed trait ChefEvent
final case class Mixing(id: UUID, groceries: Groceries) extends ChefEvent
final case class MixingDone(id: UUID, dough: Dough) extends
ChefEvent with AggregateEvent[MixingDone] {
override def aggregateTag: AggregateEventTag[MixingDone] = ChefModel.EventTag
}

The Mixing event is created in response to the MixCommand and the MixingDone event—in response to the DoneCommand. Both events relate to each other via the id property. At the recovery time both events with same id will annihilate: presence of both events means that in the past a mixing job was started and then finished. In contrast, if there is only single event we can conclude that the job was not finished. Having unbalanced Mixing events after the recovery will mean we need to restart the mixing processes for them. Having unbalanced MixingDone events can only mean programming errors.

To provide this functionality, we'll define the state as follows:

sealed trait ChefState {
def batches: List[Mixing]
}
final case class MixingState(batches: List[Mixing]) extends ChefState

We'll take a look at how it is used in the service implementation in a moment, after discussing the final bit of the model definition:

object ChefModel {
import play.api.libs.json._
implicit val mixingFormat = Json.format[Mixing]

val serializers = List(
JsonSerializer(mixingFormat),
JsonSerializer(Json.format[MixingDone]),
JsonSerializer(Json.format[MixingState]))

val EventTag: AggregateEventTag[MixingDone] = AggregateEventTag[MixingDone]("MixingDone")
}

Here, we provide serializers for our events and commands the same way we did in the previous chapter. The Lagom's recommended serialization format is JSON so we're utilizing the same approach we already used for the shared model definition.

We also define EventTag which we'll need to implement the read-side of the event journal in order to notify Manager about the completed mixing jobs.

The final piece of configuration we need is a definition of Cassandra's key space for Chef. This is done in the usual way in application.conf:

user.cassandra.keyspace = chefprogress

cassandra-journal.keyspace = ${user.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${user.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace =
${user.cassandra.keyspace}

The definition of the service reflects the fact that the communication is synchronous on the request side and message based on the response side:

trait ChefService extends Service {
def mix: ServiceCall[Groceries, Done]

def resultsTopic: Topic[Dough]

override def descriptor: Descriptor = {
named("ChefService")
.withCalls(call(mix))
.withTopics(topic(ChefService.ResultsTopic, resultsTopic))
.withAutoAcl(true)
}
}
object ChefService {
val ResultsTopic = "MixedResults"
}

The mix call accepts Groceries and returns Done (compare this with the return type of the commands we've just defined). The implementation of the service is also reasonably concise because it delegates state management to ChefPersistentEntity:

class ChefServiceImpl(persistentEntities: PersistentEntityRegistry,
as: ActorSystem) extends ChefService {

private lazy val entity = wire[ChefPersistentEntity]
persistentEntities.register(entity)

override def mix: ServiceCall[Groceries, Done] = ServiceCall { groceries =>
val ref = persistentEntities.refFor[ChefPersistentEntity]("Chef")
ref.ask(MixCommand(groceries))
}

override def resultsTopic: Topic[Dough] = ???
}

First, we need to pass two dependencies, PersistentEntityRegistry and ActorSystem. We pass the actor system, as, to ChefPersistentEntity at the moment of wiring and use the persistent entity registry to register our, well, persistent entity as required by Lagom. The mix call then just uses the registry to look up a reference to the entity and uses an ask pattern to send it an incoming command and get a response the same way we did with actors.

We're omitting the implementation of resultsTopic for now to focus on the persistence aspect of the service.

ChefPersistentEntity is a bit longer, so let's take a look at it in smaller chunks. We start with overriding Lagom's PersistentEntity:

final class ChefPersistentEntity(
persistentEntities: PersistentEntityRegistry, as: ActorSystem
) extends PersistentEntity { ... }

The persistent entity can be accessed from anywhere in the cluster. Because of this, using persistence in Lagom automatically means using clustering (which is definitely a good idea). The persistent entity needs to override a few fields:

override type Command = ChefCommand
override type Event = ChefEvent
override type State = ChefState
override def initialState: ChefState = MixingState(Nil)

The Command, Event, and State types refer to those we defined earlier. We also define an initial state to be empty MixingState.

For simplicity, we won't implement the mixing behavior in its full complexity as we already did this three times in previous chapters. Instead, we'll mock it:

private def dough(g: Groceries) = {
import g._
Dough(eggs * 50 + flour + sugar + chocolate)
}

Now we can finally define the behavior of our entity which will accept commands, persist events, and modify states. Again, this is done similarly to how we did in the previous chapter, but Lagom adds its five cents by providing an Actions constructor, which allows us to define command and event handlers in a builder-like manner:

Actions()
.onCommand[MixCommand, Done] {
case (MixCommand(groceries), ctx, _) if groceries.eggs <= 0 =>
ctx.invalidCommand(s"Need at least one egg but got: $groceries")
ctx.done

case (MixCommand(groceries), ctx, _) =>
val id = UUID.randomUUID()
ctx.thenPersist(Mixing(id, groceries)) { evt =>
as.scheduler.scheduleOnce(mixingTime)(
thisEntity.ask(DoneCommand(id)))
ctx.reply(Done)
}
}

The command handler must return a Persist directive, which describes what should be persisted with an optional callback for side-effecting code, which should be executed after an event was successfully written to the storage.

In the preceding snippet, we're handling two commands. MixCommand with a negative amount of Dough is marked as invalid (which is modelled by sending InvalidCommandException to the caller), and calling ctx.done returns PersistNone with the meaning that nothing needs to be persisted.

The second handler is for valid commands. With it, we first generate random id for the event we're going to persist, then we construct the event and return PersistOne with the event and a callback. The callback schedules send a command to the persistent entity itself meaning the mixing is done and sends Done back to the caller.

In order to be able to dereference an entity, we need to use a registry as we did in the service earlier:

lazy val thisEntity = persistentEntities.refFor[ChefPersistentEntity](this.entityId)

Please note that our persistence callbacks are only executing side effects and not modifying the state. For the state modifications, another constructor, onEvent, should be used. This separation is done in order to gain the possibility to reconstruct the state during the recovery as many times as required, but to have side effects executed only once after the actual event had happened and was persisted:

Actions()
.onCommand[MixCommand, Done] { ... }
.onEvent {
case (m: Mixing, state) =>
MixingState(state.batches :+ m)

case (MixingDone(id, _), state) =>
MixingState(state.batches.filterNot(_.id == id))
}

Here, we've just put new mixing jobs into the queue and we remove them from the queue after they are complete. Now we have to define how to react to the DoneCommand our entity sends to itself:

Actions()
.onCommand[MixCommand, Done] { ... }
.onEvent { ... }
.onCommand[DoneCommand, Done] {
case (DoneCommand(id), ctx, state) =>
state.batches
.find(_.id == id)
.map { g =>
ctx.thenPersist(MixingDone(id, dough(g.groceries))) {
_ => ctx.reply(Done)
}
}
.getOrElse(ctx.done)
}

We're looking in the current state for MixingCommand, which we created before by using id as an equality criteria, just to have a reference to groceries. The groceries will be required later; at the moment we will read the event on the read side. Then we construct and persist an event and return Done to make the compiler happy. You probably noticed that we haven't defined any side-effects for the MixingDone event. We don't need to because these events will be streamed to resultsTopic we specified earlier.

To conclude the implementation of Chef, we need to wire all components together. ChefLoader is not any different from other loaders we've defined so far. In contrast, ChefApplication deviates a bit:

abstract class ChefApplication(context: LagomApplicationContext)
extends LagomApplication(context) with CassandraPersistenceComponents with LagomKafkaComponents {
override lazy val lagomServer: LagomServer = serverFor[ChefService](wire[ChefServiceImpl])
override lazy val jsonSerializerRegistry = new JsonSerializerRegistry {
override def serializers = ChefModel.serializers
}
}

We need to provide an implementation of JsonSerializerRegistry for Lagom to be able to pick up our serializers. Our application also needs to extend CassandraPersistenceComponents as we're using persistence and also LagomKafkaComponents—by publishing our events we're effectively using messaging as well. Unfortunately, currently Lagom can't check at compile time whether messaging is used by the application, so it is easy to forget to extend Kafka components, which will lead to runtime errors at the moment the application is started. 

We have defined the persistent side of the Chef service' MixingDone events, so let's turn to the messaging side.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.237.201