Reactive applications are a good fit for messaging and event-streaming technologies. So far we have mostly looked at services that expose HTTP APIs. But although HTTP is a versatile and effective protocol for interacting with a service, it should not be the only choice.
There are several options for integrating Vert.x-based services using messaging and event streaming. This chapter introduces AMQP message brokers and Apache Kafka. We will also discuss sending email using an SMTP server.
In this chapter we’ll dive into the implementation of the ingester and congratulation services. The ingester receives step updates from devices over HTTP and AMQP, and it forwards them into the system as Kafka events. The congratulation service listens for certain Kafka events to spot when a user has reached 10,000 steps in a day, and it sends a congratulation email.
HTTP is a sensible choice as a networked interface for an event-driven service, especially when a service offers an API. Messaging and event-streaming middleware offer useful tools for decoupling and integrating services. They are also typically better suited than HTTP for exchanging lots of events between services.
Vert.x provides clients for message brokers, event streaming with Apache Kafka, and a general-purpose TCP protocol for the event bus.
Messaging middleware can be more effective than HTTP for service-to-service communications with better throughput, and it can also provide durability guarantees when a consumer or producer service is temporarily unavailable. Vert.x provides several modules for doing integration work with messaging middleware:
AMQP is a standard protocol for messaging middleware, and it’s implemented by a large number of brokers such as Apache ActiveMQ , JBoss A-MQ , Windows Azure Service Bus, RabbitMQ , and more. Vert.x provides a dedicated client for RabbitMQ and its extensions. Note that it is also possible to use the Vert.x AMQP client with RabbitMQ , since it exposes an AMQP server alongside the RabbitMQ-specific server.
STOMP is a text-based protocol for messaging middleware. It has fewer features than AMQP, but they may be enough for simple messaging. It is supported by popular message brokers.
MQTT is a protocol designed for machine-to-machine publish/subscribe interactions. It is quite popular for embedded/Internet of Things devices because it uses low bandwidth.
Vert.x provides support for Apache Kafka, a popular implementation of event-streaming middleware.
At first glance, event-streaming middleware resembles messaging systems, but it allows for interesting architectural patterns because different services can consume the same set of events at their own pace. Message brokers support publish/subscribe mechanisms for multiple services to consume the same events, but event-streaming middleware also has the ability to replay events at will. Rewinding event streams is a distinctive feature. Event-streaming middleware also allows new services to be plugged into the processing pipeline without impacting other services.
You can use event-streaming middleware just like messaging middleware, but there is more to it than just passing events between services.
Last but not least, Vert.x provides an event-bus bridge over a simple TCP protocol, with binding in JavaScript, Go, C#, C, and Python. This allows us to use the event bus to connect with non-Java applications. We will not cover this event-bus bridge in the book, but you can easily learn how to use it from the official Vert.x documentation. From the Vert.x side, this is really just the event bus, except that some of the events can be produced and consumed outside of the JVM.
The 10k steps challenge application allows us to explore AMQP for messaging, Kafka for event streaming, and sending email with an SMTP server:
AMQP is used by the ingestion service because it receives pedometer device updates over either HTTP or AMQP.
Kafka is used to convey events between many services of the application.
As in the previous chapter, Docker Compose can be used to start the required middleware services for local development purposes: Apache Kafka (which also requires Apache ZooKeeper), Apache ActiveMQ Artemis, and MailHog (a test-friendly SMTP server). You can, of course, install and run each service by yourself if you want to, but starting disposable containers with Docker offers a simplified development experience.
On the Vert.x side, we’ll use the following modules to build our services:
The Advanced Message Queuing Protocol (AMQP) is a widely used network protocol for messaging middleware backed by an open specification. The protocol itself is binary, based on TCP, and it supports authentication and encryption. We’ll use Apache ActiveMQ in the project, and it supports AMQP.
Message brokers are a classic form of service integration, as they typically support message queues and publish/subscribe communications. They allow services to communicate through message passing, and the broker ensures message durability.
Figure 9.1 shows the interactions between a device, an AMQP queue that collects step events, and the ingestion service.
Messages can be made durable, so that they are not lost if the broker crashes. Producers and consumers can use acknowledgements to ensure that a message has been properly sent or retrieved and then processed. Brokers also offer various quality-of-service features, such as expiration dates and advanced routing. Depending on the broker, messages can be transformed from one representation to another, such as converting from a binary format to JSON. Some brokers also support aggregating multiple messages into one, or conversely splitting one message to produce many.
Note If you are new to ActiveMQ, I suggest reading ActiveMQ in Action by Bruce Snyder, Dejan Bosanac, and Rob Davies (Manning, 2011).
Apache Kafka is event-streaming middleware based on distributed logs. While that may sound complicated, all you really need to understand is that Kafka offers streams of event records, where producers can append new records and consumers can walk back and forth along streams. For instance, incoming pedometer step updates form a stream where each event is an update sent by a device, and the ingestion service produces these events. On the other hand, various consumers can look at the events on that stream to populate databases, compute statistics, and so on. Events remain in a stream for some amount of time, or until the stream is too big and has to discard its oldest records.
Kafka supports publish/subscribe interactions between distributed services, as illustrated in figure 9.2. In a Kafka cluster, events are published and consumed from topics that group related events. Topics are split into replicated partitions, which are ordered sequences of events. Each event is identified by its offset position in the event log that materializes its partition.
Consumers pull events from partitions. They can keep track of the last offset that they have consumed, but it is also possible to arbitrarily seek any random position in a partition, or even to replay all events since the beginning. Also, consumer groups can divide work by reading from different partitions and parallelizing event processing.
It is easy to think that Kafka is a messaging system like ActiveMQ, and in some cases Kafka is very fine messaging middleware, but it should still be considered streaming middleware.
In a message broker, messages disappear when they have been consumed from a queue, or when they expire. Kafka partitions eventually evict records, either using a partition size limit (such as 2 GB) or using some eviction delay (such as two weeks). Kafka records should be considered “semi-durable” as they will eventually disappear. It is possible to configure the partitions in a topic to keep events forever, but this is quite rare as events are expected to produce durable effects when consumed. For instance, the ingestion service produces incoming step update records, and the activity service turns these records into long-term facts in a database. Another interesting feature of Kakfa is that topics can be replayed at will, so new services can join and consume a stream at their own pace.
Note I suggest reading Dylan Scott’s Kafka in Action (Manning, 2017) if you are new to Apache Kafka.
Let’s now dive into the ingestion service.
Everything begins with the ingestion service, as it receives step count updates from the pedometers. In our (fictitious) application, we can expect that several types of pedometers will be available, and that they have different communication capabilities. For example, some devices may directly talk to the ingestion service over the internet, while others may need to reach a gateway that forwards updates to the ingestion service.
This is why we offer two interfaces for ingesting device updates:
A device can connect to the HTTP API provided by the ingestion service.
A device can forward an update to a message broker, and the ingestion service receives the updates from the broker.
Once an update has been received, it must be validated and then sent to a Kafka topic. It is interesting to explore both the AMQP and HTTP interfaces, as we can see similarities in their implementations but also differences in acknowledging device updates.
We’ll start with the AMQP ingestion. We first need to create an AMQP client that connects to the broker. The following listing shows the client configuration code.
private AmqpClientOptions amqpConfig() { return new AmqpClientOptions() .setHost("localhost") .setPort(5672) .setUsername("artemis") ❶ .setPassword("simetraehcapa"); } // (...) AmqpClientOptions amqpOptions = amqpConfig(); AmqpReceiverOptions receiverOptions = new AmqpReceiverOptions() .setAutoAcknowledgement(false) ❷ .setDurable(true); ❸
❶ The credentials are the default ones from the Docker image.
❷ We will manually acknowledge incoming messages.
The amqpConfig
method that we use here provides a configuration with hardcoded values. This is great for the testing we do in this book, but for production you would, of course, resolve credentials, hostnames, and port numbers from some external source. These could be environment variables or a registry service, such as Apache ZooKeeper or Consul. We also set up the connection for durable messaging and declare manual acknowledgment, as we want to retry message processing if writing to a Kafka topic fails.
The next step is to set up the event-processing pipeline for incoming AMQP messages. We use RxJava to dispatch messages to a processing function, log errors, and recover from errors, as shown in the following listing.
AmqpClient.create(vertx, amqpOptions) ❶ .rxConnect() .flatMap(conn -> conn.rxCreateReceiver("step-events", receiverOptions)) ❷ .flatMapPublisher(AmqpReceiver::toFlowable) ❸ .doOnError(this::logAmqpError) ❹ .retryWhen(this::retryLater) ❺ .subscribe(this::handleAmqpMessage); ❻
❷ Create a message receiver from the step-events destination.
❸ Create a Flowable of AMQP messages.
❻ Subscription that dispatches incoming messages
This pipeline is interesting as it is purely declarative. It starts with the creation of a client, and then it obtains a receiver for the step-events
durable queue and a flow of messages. From there we declare what to do upon receiving a message or an error. We also keep the code short and clean by using Java method references rather than lambdas. But what do the logAmqpError
, retryLater
, and handleAmqpMessage
methods do?
Logging messages is not very complicated.
private void logAmqpError(Throwable err) {
logger.error("Woops AMQP", err); ❶
}
❶ Log the error and stack trace.
Errors happen. For instance, we can lose the connection to the AMQP broker. In this case, an error passes along the pipeline, and logAmqpError
logs it, but doOnError
lets the error propagate to subscribers.
We then need to retry connecting to the AMQP broker and resume receiving events, which translates to resubscribing to the source in RxJava. We can do that with the retryWhen
operator, as it allows us to define our own policy. If you just want to retry a number of times, or even always, then retry
is simpler. The following listing shows how we introduce a 10-second delay before resubscribing.
private Flowable<Throwable> retryLater(Flowable<Throwable> errs) {
return errs.delay(10, TimeUnit.SECONDS, RxHelper.scheduler(vertx)); ❶
}
❶ It is important to use the scheduler parameter to process events on a Vert.x event loop.
The retryLater
operator works as follows:
To delay the resubscription by 10 seconds, we use the delay
operator. It will eventually emit a value, so onNext
will be called and a resubscription will happen. You can, of course, think of more elaborate handlers, like limiting the number of retries or using an exponential back-off strategy. We will use this pattern a lot, as it greatly simplifies the error-recovery logic.
The following listing contains the method that handles incoming AMQP messages, validates them, and then pushes them as Kafka records.
private void handleAmqpMessage(AmqpMessage message) { if (!"application/json".equals(message.contentType()) || ➥ invalidIngestedJson(message.bodyAsJsonObject())) { ❶ logger.error("Invalid AMQP message (discarded): {}", ➥ message.bodyAsBinary()); message.accepted(); return; } JsonObject payload = message.bodyAsJsonObject(); KafkaProducerRecord<String, JsonObject> record = makeKafkaRecord(payload);❷ updateProducer.rxSend(record).subscribe( ok -> message.accepted(), ❸ err -> { logger.error("AMQP ingestion failed", err); message.rejected(); ❹ }); }
❶ Check for a valid JSON message.
❸ Acknowledge the AMQP message.
The handleAmqpMessage
method first performs some validation on the incoming AMQP message and then prepares a Kafka record. The AMQP message is acknowledged when the Kafka record is written, and it is rejected if the record could not be written.
tip In listing 9.5 and all subsequent services, we will directly work with JsonObject
data representation. There is little point in converting the JSON representation to Java domain classes (such as an IngestionData
class) given that we mostly copy and transform data. You can, of course, perform such mapping if you have to do some more complex business logic and the cost of abstraction is justified.
The invalidIngestedJson
method checks that the JSON data contains all required entries, as follows.
private boolean invalidIngestedJson(JsonObject payload) {
return !payload.containsKey("deviceId") || ❶
!payload.containsKey("deviceSync") ||
!payload.containsKey("stepsCount");
}
The makeKafkaRecord
method in the following listing converts the AMQP message JSON to a Kafka record aimed at the incoming-steps
topic.
private KafkaProducerRecord<String, JsonObject> makeKafkaRecord(JsonObject ➥ payload) { String deviceId = payload.getString("deviceId"); JsonObject recordData = new JsonObject() ❶ .put("deviceId", deviceId) .put("deviceSync", payload.getLong("deviceSync")) .put("stepsCount", payload.getInteger("stepsCount")); return KafkaProducerRecord.create("incoming.steps", deviceId, recordData);❷ }
❷ Record with key deviceId and JSON data
We could avoid copying all JSON entries manually and just pass the JSON from the AMQP message to the Kafka record. This, however, helps ensure that no extra data ends up in the Kafka record.
The updateProducer
field is of type KafkaProducer<String, JsonObject>
because it produces messages with string keys and JSON payloads. Instances of KafkaProducer
are created by passing configuration from a Map
as follows.
Map<String, String> kafkaConfig() { Map<String, String> config = new HashMap<>(); config.put("bootstrap.servers", "localhost:9092"); config.put("key.serializer", ➥ "org.apache.kafka.common.serialization.StringSerializer"); ❶ config.put("value.serializer", ➥ "io.vertx.kafka.client.serialization.JsonObjectSerializer"); ❷ config.put("acks", "1"); return config; } // (...) // in rxStart() updateProducer = KafkaProducer.create(vertx, kafkaConfig()); ❸
❶ Class to serialize values from strings
❷ Class to serialize values from Vert.x JsonObject
❸ Create a Vert.x Kafka producer.
The configuration especially specifies the serializer (or deserializer) classes, as Kafka records need to be mapped to Java types. StringSerializer
comes from the Kafka client library, and it serializes Java strings to Kafka data, whereas JsonObjectSerializer
comes from Vert.x and serializes JsonObject
data. You need to specify correct serializer classes for both your keys and values. Similarly, you will need to configure deserializers when reading from Kafka topics.
tip The Vert.x Kafka module wraps the Java client from the Apache Kafka project, and all configuration key/value pairs match those from the Kafka Java client documentation.
The code to ingest from HTTP is very similar to that of ingesting with AMQP. The most notable difference is that an HTTP status code needs to be set, so that the device that sent an update knows that ingestion has failed and must be retried later.
We first need an HTTP server and router.
Router router = Router.router(vertx);
router.post().handler(BodyHandler.create()); ❶
router.post("/ingest").handler(this::httpIngest);
return vertx.createHttpServer()
.requestHandler(router)
.rxListen(HTTP_PORT)
.ignoreElement();
❶ BodyHandler decodes HTTP request bodies.
The httpIngest
method is shown in the next listing, and it’s quite similar to handleAmqpMessage
.
private void httpIngest(RoutingContext ctx) { JsonObject payload = ctx.getBodyAsJson(); if (invalidIngestedJson(payload)) { ❶ logger.error("Invalid HTTP JSON (discarded): {}", payload.encode()); ctx.fail(400); ❷ return; } KafkaProducerRecord<String, JsonObject> record = makeKafkaRecord(payload); updateProducer.rxSend(record).subscribe( ok -> ctx.response().end(), ❸ err -> { logger.error("HTTP ingestion failed", err); ctx.fail(500); ❹ }); }
❷ Bad JSON; let the requester know that.
❹ The ingestion failed; let the requester know that.
HTTP status codes are important for letting the client know if the payload is incorrect (400), if the ingestion failed due to some (temporary) error (500), or if the ingestion succeeded (200).
The ingestion service is a good example of integration using different input protocols. Let’s now explore more of Apache Kafka with Vert.x through the congratulation service.
While the ingestion service produces Kafka events, the congratulation service consumes Kafka events.
The activity service generates daily step events whenever a device update has been received. Each event contains the number of steps recorded for the originating device on the current day. The congratulation service can observe these events as they are sent to the daily.step.updates
Kafka topic, and it can target the events where the number of steps is above 10,000.
The events sent to the daily.step.updates
Kafka topic are JSON data with the following content:
The Kafka records also have a key, which is the concatenation of several parameters: deviceId:year-month-day
. In this scheme, all records of device 1a2b
produced on October 6th 2019 have the key 1a2b:2019-10-06
. As you will shortly see, the key will be useful not just to ensure that events for a given device are consumed in order, but also to ensure that we don’t send more than one congratulation email per day.
The pipeline for processing daily steps event is shown in figure 9.3.
Daily step updates flow from the daily.step.updates
Kafka topic, and then
We discard events where the number of steps is less than 10,000.
We discard events for which an event with the same key has already been processed.
The following listing contains the corresponding RxJava pipeline.
KafkaConsumer.<String, JsonObject>create(vertx, ➥ KafkaConfig.consumerConfig("congrats-service")) .subscribe("daily.step.updates") ❶ .toFlowable() .filter(this::above10k) ❷ .distinct(KafkaConsumerRecord::key) ❸ .flatMapSingle(this::sendmail) ❹ .doOnError(err -> logger.error("Woops", err)) .retryWhen(this::retryLater) ❺ .subscribe(mailResult -> logger.info("Congratulated {}", ➥ mailResult.getRecipients())); ❻
❶ Subscribe to the Kafka topic.
❷ Filter out events with less than 10,000 steps.
❸ Discard events for which a previous event with the same key has been processed.
❹ Asynchronous operation to send an email
❻ Log each successful congratulation.
The preceding listing uses the RxJava binding to subscribe to a Kafka topic as a Flowable
for Kafka records. We then use the filter
combinator to filter out records with less than 10,000 steps, and use the predicate method in the following listing.
private boolean above10k(KafkaConsumerRecord<String, JsonObject> record) {
return record.value().getInteger("stepsCount") >= 10_000; ❶
}
The distinct
combinator in listing 9.11 ensures that only one event for each Kafka record key is retained, right after filter
. This is to avoid sending more than one congratulation email to a user on a given day, as we could easily have a first event with, say, 10,100 steps, followed later by another event with 10,600 steps, and so on. Note that this design is not 100% bulletproof, as it requires storing already-processed key values in memory, and upon a service restart we could accidentally send a second email. This is a reasonable trade-off in our example, compared to using a persistent data store just to keep track of when an email was last sent to a user.
The rest of the pipeline uses similar event processing and retryWhen
logic to resubscribe on errors. The sendmail
method is an asynchronous operation to send an email--let’s look at how it works.
The vertx-mail-client
module offers an SMTP client. The following listing shows how to create such a client.
MailClient mailClient = MailClient.createShared(vertx, MailerConfig.config());❶
As with many other Vert.x clients, we obtain an instance through a factory method, passing a Vertx
context as well as some parameters.
The MailerConfig
class provides a method to retrieve configuration data, as shown next.
class MailerConfig { static MailConfig config() { return new MailConfig() .setHostname("localhost") ❶ .setPort(1025); ❷ } }
Again, these hardcoded values are fine for testing purposes and for keeping our code simple. The values are for connecting to MailHog, the testing SMTP server that we’re using from a Docker container. The MailConfig
class supports more configuration options like SSL, authentication method, credentials, and so on.
A daily-steps update Kafka event applies to a device; it does not contain the name of the owner or the email address. Before we can send an email, we must first fetch the missing information (name and email) from the user profile service. We thus need two requests to that service:
The sendmail
method is shown in the following listing.
private Single<MailResult> sendmail(KafkaConsumerRecord<String, JsonObject> ➥ record) { String deviceId = record.value().getString("deviceId"); ❶ Integer stepsCount = record.value().getInteger("stepsCount"); return webClient .get(3000, "localhost", "/owns/" + deviceId) ❷ .as(BodyCodec.jsonObject()) .rxSend() .map(HttpResponse::body) ❸ .map(json -> json.getString("username")) ❹ .flatMap(this::getEmail) ❺ .map(email -> makeEmail(stepsCount, email)) ❻ .flatMap(mailClient::rxSendMail); ❼ }
❶ Extract the device identifier.
❷ Prepare a request to find who owns the device.
❸ Extract the body, which is a JsonObject.
❺ Asynchronous operation to fetch the email for the user
❼ Asynchronously send the email.
The sendmail
method is another RxJava pipeline that composes asynchronous operations and data processing, illustrated in figure 9.4.
It starts by issuing an HTTP request to the user profile service and finding the user name of the device owner. It then prepares another request to fetch the user profile data to get the email address. The following listing provides the implementation of the getEmail
method.
private Single<String> getEmail(String username) { return webClient .get(3000, "localhost", "/" + username) .as(BodyCodec.jsonObject()) .rxSend() ❶ .map(HttpResponse::body) .map(json -> json.getString("email")); ❷ }
❷ Keep only the email address.
The next step is to prepare an email, enclosed in a MailMessage
instance, as shown in the following implementation of the makeEmail
method.
private MailMessage makeEmail(Integer stepsCount, String email) { return new MailMessage() .setFrom("[email protected]") ❶ .setTo(email) ❷ .setSubject("You made it!") ❸ .setText("Congratulations on reaching " + stepsCount + " ➥ steps today! - The 10k Steps Team "); ❹ }
Note that for more advanced email formatting, you could use a template engine rather than text.
Now that you know how to do messaging and event streaming with Vert.x, let’s not forget integration testing, to ensure that both the ingestion and congratulation services work correctly.
Testing the ingestion service involves sending device updates over AMQP and HTTP, and observing the Kafka topics. Conversely, testing the congratulation service involves sending events to Kafka topics, and observing the emails.
Testing the ingestion service requires sending a message over AMQP or HTTP, and then checking that a Kafka record has been emitted, as shown in figure 9.5.
The IntegrationTest
class in the ingestion service source code uses JUnit 5 and Docker containers to start an AMQP broker, Apache Kafka, and Apache ZooKeeper. The following listing shows the test preparation.
@BeforeEach void setup(Vertx vertx, VertxTestContext testContext) { kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfig()); ❶ amqpClient = AmqpClient.create(vertx, amqClientOptions()); ❷ KafkaAdminClient adminClient = KafkaAdminClient.create(vertx, ➥ kafkaConfig()); ❸ vertx .rxDeployVerticle(new IngesterVerticle()) ❹ .delay(500, TimeUnit.MILLISECONDS, RxHelper.scheduler(vertx)) .flatMapCompletable(id -> ➥ adminClient.rxDeleteTopics(singletonList("incoming.steps"))) ❺ .onErrorComplete() .subscribe(testContext::completeNow, testContext::failNow); }
❹ Deploy the ingestion verticle.
❺ Delete all incoming.steps topics if they exist.
The preparation consists of deploying the IngesterVerticle
verticle, and then deleting any existing incoming.steps
topic. This ensures that tests do not pollute each other with remaining Kafka events. Note the onErrorComplete
combinator: it ensures progress, because deleting topics raises an error when they don’t exist. We want to run the tests when incoming.steps
does not exist, which is typically the case of the first test being run. Of course, onErrorComplete
can mask a deployment failure of IngesterVerticle
, but we will find that out in test executions.
The following listing shows the preamble of the test case where a well-formed AMQP message is being ingested.
@Test @DisplayName("Ingest a well-formed AMQP message") void amqIngest(VertxTestContext testContext) { JsonObject body = new JsonObject().put("deviceId", "123") .put("deviceSync", 1L).put("stepsCount", 500); amqpClient.rxConnect() ❶ .flatMap(connection -> connection.rxCreateSender("step-events")) ❷ .subscribe(sender -> { AmqpMessage msg = AmqpMessage.create() ❸ .durable(true) .ttl(5000) .withJsonObjectAsBody(body).build(); sender.send(msg); ❹ }, testContext::failNow); // (...) }
❶ Open an AMQP client connection.
❷ Create a sender to the step-events destination.
The AMQP client sends a message that we know is well-formed, as its body contains all the required JSON entries.
Once this is done, we need to check that a Kafka record has been sent, as follows.
kafkaConsumer.subscribe("incoming.steps") ❶ .toFlowable() .subscribe( record -> testContext.verify(() -> { ❷ assertThat(record.key()).isEqualTo("123"); JsonObject json = record.value(); assertThat(json.getString("deviceId")).isEqualTo("123"); assertThat(json.getLong("deviceSync")).isEqualTo(1L); assertThat(json.getInteger("stepsCount")).isEqualTo(500); testContext.completeNow(); ❸ }), testContext::failNow); ❹
❶ Subscribe to the Kafka topic.
❷ Perform assertions on the Kafka record.
Of course, we also need to test what happens when an incorrect message is sent, like an empty JSON document. We must check that no Kafka record is being emitted, as in the following listing.
@Test @DisplayName("Ingest a badly-formed AMQP message and observe no Kafka record") void amqIngestWrong(Vertx vertx, VertxTestContext testContext) { JsonObject body = new JsonObject(); ❶ // (...) ❷ kafkaConsumer.subscribe("incoming.steps") .toFlowable() .timeout(3, TimeUnit.SECONDS, RxHelper.scheduler(vertx)) ❸ .subscribe( record -> testContext.failNow(new ➥ IllegalStateException("We must not get a record")), err -> { if (err instanceof TimeoutException) { ❹ testContext.completeNow(); } else { testContext.failNow(err); } }); }
❷ Send it (same code as in listing 9.20)
❹ Check that this is the error we expected!
The timeout in the RxJava pipeline is important, as we need to let some time lapse to be sure that no Kafka record has been sent. The remainder of the IntegrationTest
class is quite similar, with two test cases for the HTTP ingestion: one that checks what happens when a correct payload is sent, and one where the payload is an empty JSON document.
Testing the behavior of the congratulation service is more involved than the ingestion, as there are more moving parts in the test environment, as illustrated in figure 9.6.
The goal is to send Kafka records and then observe the emails that have been sent (or not sent). Interestingly, MailHog is not just an SMTP server; it also provides a web interface and an HTTP API to simulate an email inbox. This allows us to perform tests by sending Kafka records, and then checking what emails have been received in the inbox.
The CongratsTest
class features a prepare
initialization method that creates a Kafka producer (to send Kafka events) and a Vert.x web client (to query the inbox). The steps in the prepare
method to prepare the environment are shown in the following listing.
KafkaAdminClient adminClient = KafkaAdminClient.create(vertx, conf); adminClient .rxDeleteTopics(Arrays.asList("incoming.steps", "daily.step.updates")) ❶ .onErrorComplete() .andThen(vertx.rxDeployVerticle(new CongratsVerticle())) ❷ .ignoreElement() .andThen(vertx.rxDeployVerticle(new FakeUserService())) ❸ .ignoreElement() .andThen(webClient.delete(8025, "localhost", "/api/v1/messages").rxSend())❹ .ignoreElement() .subscribe(testContext::completeNow, testContext::failNow);
❸ Deploy a mock user account service.
❹ Delete all messages from the inbox.
We first delete existing Kafka topics, and then we deploy the verticle under test. We also deploy a verticle to mock the user profile service and delete all messages from the inbox by making an HTTP DELETE
query to the MailHog instance.
The FakeUserService
verticle found in the test source exposes an HTTP service with the minimal level of functionality to replace the real user profile service in our tests. All requests to find out who owns a device point to user Foo
, and retrieving the details of user Foo
gives just the username and email. The following listing shows an excerpt with the code for answering a user details request with information for user Foo
and just the JSON entries needed for CongratsVerticle
to operate.
router.get("/:username").handler(this::username); ❶ //(...) private void username(RoutingContext ctx) { logger.info("User data request {}", ctx.request().path()); JsonObject notAllData = new JsonObject() ❷ .put("username", "Foo") .put("email", "[email protected]"); ctx.response() .putHeader("Content-Type", "application/json") .end(notAllData.encode()); }
❶ Route for a user profile info
❷ JSON with just the required data for the service and test
This way we have good isolation of the congratulation service for testing. We could also have deployed the real user profile service, but that would have involved preparing a database with some data. It is always better to replace dependent services with mock ones when you can.
The next listing shows the full test case for checking that no email is sent on a Kafka record with less than 10,000 steps.
@Test @DisplayName("No email must be sent below 10k steps") void checkNothingBelow10k(Vertx vertx, VertxTestContext testContext) { producer .rxSend(record("123", 5000)) ❶ .ignoreElement() .delay(3, TimeUnit.SECONDS, RxHelper.scheduler(vertx)) ❷ .andThen(webClient .get(8025, "localhost", "/api/v2/search?kind=to&[email protected]") ❸ .as(BodyCodec.jsonObject()).rxSend()) .map(HttpResponse::body) .subscribe( json -> { testContext.verify(() -> ➥ assertThat(json.getInteger("total")).isEqualTo(0)); ❹ testContext.completeNow(); }, testContext::failNow); }
❶ Kafka record for device 123 and 5000 steps
❷ Wait for three seconds after the message has been sent.
❸ Query all messages for email [email protected].
❹ Check that there is no message.
The MailHog API allows us to check what messages have been sent. The next listing checks whether an email was sent for more than 10,000 steps.
producer .rxSend(record("123", 11_000)) ❶ .ignoreElement() .delay(3, TimeUnit.SECONDS, RxHelper.scheduler(vertx)) .andThen(webClient .get(8025, "localhost", "/api/v2/search?kind=to&[email protected]") .as(BodyCodec.jsonObject()).rxSend()) .map(HttpResponse::body) .subscribe( json -> { testContext.verify(() -> ➥ assertThat(json.getInteger("total")).isEqualTo(1)); ❷ testContext.completeNow(); }, testContext::failNow);
The last test case in the checkNotTwiceToday
method checks that only one email was sent for two successive records with more than 10,000 steps. I haven’t reproduced the code here due to its verbosity, but you can get it from the book’s source code repository.
This concludes the design, implementation, and testing of two services that use messaging and event streaming. The next chapter focuses on Vert.x and data sources.
AMQP is a standard protocol for message brokers, and you saw how to consume and produce AQMP messages with Vert.x and Apache ActiveMQ.
Apache Kafka is event-streaming middleware that allows services to replay events at will. Vert.x provides efficient integration with Kafka.
RxJava allows you to write event-processing pipelines in a declarative fashion, and with built-in error recovery.
We explored strategies for writing integration tests with AMQP, Kafka, and test containers by sending messages from tests to replace external components.
MailHog is a test-friendly SMTP server that exposes a convenient API for inspecting what emails have been sent.
13.58.121.131