Configuring Akka Persistence

Akka Persistence allows us to store and replay messages sent to PersistentActor and thus implements an event-sourcing approach. Before going into the details of the actors implementation, let's look at the arrangements we need to make in the project configuration. 

We're going to use the H2 relational database for this project. Akka Persistence supports many different storage plugins, including a local filesystem for storing snapshots, and in our case, it appears to be a good idea to use the same database we used with doobie to underline the differences in the architectural style.

Again, we're using Flyway to create the structure of the database. The tables will be different though. This is the table that will store events:

CREATE TABLE IF NOT EXISTS PUBLIC."journal" (
"ordering" BIGINT AUTO_INCREMENT,
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"deleted" BOOLEAN DEFAULT FALSE,
"tags" VARCHAR(255) DEFAULT NULL,
"message" BYTEA NOT NULL,
PRIMARY KEY("persistence_id", "sequence_number")
);

persistence_id is an ID of a specific persistent actor, which needs to be unique for the whole actor system (we'll see in a minute how this maps to the code), the tags field holds tags assigned to the events (this makes constructing views easier). message holds an event in serialized form. The serialization mechanism is decoupled from the storage. Akka supports different flavours, including Java serialization, Google Protobuf, Apache Thrift, or Avro and JSON. We'll use the JSON format in order to keep the example small.

The snapshots table is even simpler:

CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"created" BIGINT NOT NULL,
"snapshot" BYTEA NOT NULL,
PRIMARY KEY("persistence_id", "sequence_number")
);

Basically, it's just a snapshot in serialized form, with a timestamp and the persistence_id of the actor it belongs to.

With these tables in the migrations file, we now need to add following dependencies to build.sbt

"com.typesafe.akka"   %% "akka-persistence"       % akkaVersion,
"com.github.dnvriend" %% "akka-persistence-jdbc" % akkaPersistenceVersion,
"com.scalapenos" %% "stamina-json" % staminaVersion,
"com.h2database" % "h2" % h2Version,
"org.flywaydb" % "flyway-core" % flywayVersion,

The akka-persistence dependency is obvious. akka-persistence-jdbc is an implementation of the JDBC storage for the h2 database. Flyway-core is used to set up the database like in the previous example. stamina-json allows for schema migrations—it gives us a way to describe how the events stored in the old format in the database should be converted to the new format used in the code if needed. 

We also need to put quite a bit of configuration for the Akka persistence in application.conf to configure journals. This configuration is quite verbose, so we will not discuss it here in full, but we will take a look at one part of it that describes serialization:

akka.actor {
serializers.serializer = "ch14.EventSerializer"
serialization-bindings {
"stamina.Persistable" = serializer
}
}

Here, we configure serialization for the stamina. Let's take a look at EventSerializer:

class EventSerializer
extends stamina.StaminaAkkaSerializer(v1createdPersister,
v1deletedPersister,
v1purchasedPersister,
v1restockedPersister,
v1inventoryPersister)

Here, we tell stamina which serializers to use. The serializers are defined as follows:

import stamina.json._

object
PersistenceSupport extends JsonSupport {
val v1createdPersister = persister[ArticleCreated]("article-created")
val v1deletedPersister = persister[ArticleDeleted]("article-deleted")
val v1purchasedPersister = persister[ArticlesPurchased]("articles-purchased")
val v1restockedPersister = persister[ArticlesRestocked]("articles-restocked")
val v1inventoryPersister = persister[Inventory]("inventory")
}

In the PersistenceSupport object, we define persisters for our events. We don't need any migrations yet, but in the case we would, the migrations would be described here. Persister requires implicit RootJsonFormat to be available and we provide them in the JsonSupport trait:

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.{DefaultJsonProtocol, RootJsonFormat}
import DefaultJsonProtocol._

trait JsonSupport extends SprayJsonSupport {
implicit val invJF: RootJsonFormat[Inventory] =
jsonFormat1(Inventory)

implicit val createArticleJF = jsonFormat2(CreateArticle)
implicit val deleteArticleJF = jsonFormat1(DeleteArticle)
implicit val purchaseJF = jsonFormat1(PurchaseArticles)
implicit val restockJF = jsonFormat1(RestockArticles)

implicit val createdJF = jsonFormat2(ArticleCreated)
implicit val deletedJF = jsonFormat1(ArticleDeleted)
implicit val pJF = jsonFormat1(ArticlesPurchased)
implicit val reJF = jsonFormat1(ArticlesRestocked)
}

We extend SprayJsonSupport and import DefaultJsonProtocol._ to get implicit formats for basic types already defined by spray-json. Then we define RootJsonFormat for all of our commands (these formats will be used by the API layer to un-marshall request bodies), events (which will be used by both the API layer to marshall responses, and the persistence layer to serialize events), and an Inventory (which is required for snapshots to be serializable). Here we're not relying on circe's auto-derivation and hence describe each case class individually.

Now we have persisters and formats for the model, but what is that model? It reflects the event-sourcing approach!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.56.29