The persistent actor

The persistent actor in Akka extends the normal Actor and mixes in a PersistentActorPersistentActor implements the receive method but requires a few other methods to be implemented by us:

class InventoryActor extends Actor with PersistentActor {
private var inventory: Inventory = Inventory(Map.empty)

override def persistenceId: String = InventoryActor.persistenceId

override def receiveRecover: Receive = ???

override def receiveCommand: Receive = ???
}

Besides the inventory that we need as a representation of state, we need to define a unique persistenceId and two methods: receiveRecover and receiveCommand. The former is called during the recovery time, for example at startup or if the persistent actor is restarted, and it receives all events from the journal. It is expected to modify the internal state but not to execute any side-effects. The latter is called during the normal lifetime and it receives all the commands. It is supposed to convert valid commands to events, persist the events, modify the internal state, and execute side-effecting code after that.

In our example, receiveRecover just delegates the event processing to inventory:

override def receiveRecover: Receive = {
case SnapshotOffer(_, snapshot: Inventory) => inventory = snapshot
case event: Event => inventory = inventory.update(event)
case RecoveryCompleted => saveSnapshot(inventory)
}

Additionally, it handles instances of SnapshotOffer by restoring the inventory as a whole from the latest snapshot. SnapshotOffer will be the first message the actor receives if there are snapshots available, and it will contain the latest snapshot so it is safe to restore the inventory from it. The events in the journal before the snapshot will not be replayed. Finally, after receiving the RecoveryCompleted event, we save the current state as a snapshot for use after the next restart.

The receiveCommand implementation is a bit more involved:

override def receiveCommand: Receive = {
case GetInventory =>
sender() ! inventory

case cmd: Command =>
inventory.canUpdate(cmd) match {
case None =>
sender() ! None
case Some(event) =>
persistAsync(event) { ev =>
inventory = inventory.update(ev)
sender() ! Some(ev)
}
}
}

We handle the GetInventory query by sending a current state to the sender. Inventory is a wrapper over an immutable map, so it is safe to share.

We handle all Commands the same way, by letting Inventory do the actual work. If a command cannot be applied, we respond to the sender with None. In the opposite case, we asynchronously persist corresponding events and provide a callback that will be executed after the event is persisted. In the callback, we apply the event to the internal state and send the new state to the sender. In contrast to the normal actor, it is safe to use sender() in an async block.

And this is it, we now have a persistent actor that will restore its state after the restart. Time to make it available for HTTP clients.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.20.231