9 Messaging and event streaming with Vert.x

This chapter covers

  • Messaging with AMQP
  • Event streaming with Apache Kafka
  • Sending emails
  • Integration testing with messaging and event-streaming middleware

Reactive applications are a good fit for messaging and event-streaming technologies. So far we have mostly looked at services that expose HTTP APIs. But although HTTP is a versatile and effective protocol for interacting with a service, it should not be the only choice.

There are several options for integrating Vert.x-based services using messaging and event streaming. This chapter introduces AMQP message brokers and Apache Kafka. We will also discuss sending email using an SMTP server.

In this chapter we’ll dive into the implementation of the ingester and congratulation services. The ingester receives step updates from devices over HTTP and AMQP, and it forwards them into the system as Kafka events. The congratulation service listens for certain Kafka events to spot when a user has reached 10,000 steps in a day, and it sends a congratulation email.

9.1 Event-driven services beyond HTTP with Vert.x

HTTP is a sensible choice as a networked interface for an event-driven service, especially when a service offers an API. Messaging and event-streaming middleware offer useful tools for decoupling and integrating services. They are also typically better suited than HTTP for exchanging lots of events between services.

9.1.1 What Vert.x provides

Vert.x provides clients for message brokers, event streaming with Apache Kafka, and a general-purpose TCP protocol for the event bus.

Message brokers

Messaging middleware can be more effective than HTTP for service-to-service communications with better throughput, and it can also provide durability guarantees when a consumer or producer service is temporarily unavailable. Vert.x provides several modules for doing integration work with messaging middleware:

  • An Advanced Message Queuing Protocol (AMQP) client

  • A Simple Text Oriented Messaging Protocol (STOMP) client

  • A RabbitMQ client

  • A Message Queuing Telemetry Transport (MQTT) client

AMQP is a standard protocol for messaging middleware, and it’s implemented by a large number of brokers such as Apache ActiveMQ , JBoss A-MQ , Windows Azure Service Bus, RabbitMQ , and more. Vert.x provides a dedicated client for RabbitMQ and its extensions. Note that it is also possible to use the Vert.x AMQP client with RabbitMQ , since it exposes an AMQP server alongside the RabbitMQ-specific server.

STOMP is a text-based protocol for messaging middleware. It has fewer features than AMQP, but they may be enough for simple messaging. It is supported by popular message brokers.

MQTT is a protocol designed for machine-to-machine publish/subscribe interactions. It is quite popular for embedded/Internet of Things devices because it uses low bandwidth.

Kafka event streaming

Vert.x provides support for Apache Kafka, a popular implementation of event-streaming middleware.

At first glance, event-streaming middleware resembles messaging systems, but it allows for interesting architectural patterns because different services can consume the same set of events at their own pace. Message brokers support publish/subscribe mechanisms for multiple services to consume the same events, but event-streaming middleware also has the ability to replay events at will. Rewinding event streams is a distinctive feature. Event-streaming middleware also allows new services to be plugged into the processing pipeline without impacting other services.

You can use event-streaming middleware just like messaging middleware, but there is more to it than just passing events between services.

Event-bus TCP bridge

Last but not least, Vert.x provides an event-bus bridge over a simple TCP protocol, with binding in JavaScript, Go, C#, C, and Python. This allows us to use the event bus to connect with non-Java applications. We will not cover this event-bus bridge in the book, but you can easily learn how to use it from the official Vert.x documentation. From the Vert.x side, this is really just the event bus, except that some of the events can be produced and consumed outside of the JVM.

9.1.2 The middleware and services that we’ll use

The 10k steps challenge application allows us to explore AMQP for messaging, Kafka for event streaming, and sending email with an SMTP server:

  • AMQP is used by the ingestion service because it receives pedometer device updates over either HTTP or AMQP.

  • Kafka is used to convey events between many services of the application.

  • SMTP is used to send congratulation emails to users.

As in the previous chapter, Docker Compose can be used to start the required middleware services for local development purposes: Apache Kafka (which also requires Apache ZooKeeper), Apache ActiveMQ Artemis, and MailHog (a test-friendly SMTP server). You can, of course, install and run each service by yourself if you want to, but starting disposable containers with Docker offers a simplified development experience.

On the Vert.x side, we’ll use the following modules to build our services:

  • vertx-amqp-client--The AMQP client

  • vertx-kafka-client--The Apache Kafka client

  • vertx-mail-client--The SMTP client that will send emails

9.1.3 What is AMQP (and a message broker)?

The Advanced Message Queuing Protocol (AMQP) is a widely used network protocol for messaging middleware backed by an open specification. The protocol itself is binary, based on TCP, and it supports authentication and encryption. We’ll use Apache ActiveMQ in the project, and it supports AMQP.

Message brokers are a classic form of service integration, as they typically support message queues and publish/subscribe communications. They allow services to communicate through message passing, and the broker ensures message durability.

Figure 9.1 shows the interactions between a device, an AMQP queue that collects step events, and the ingestion service.

Figure 9.1 Overview of an AMQP queue

Messages can be made durable, so that they are not lost if the broker crashes. Producers and consumers can use acknowledgements to ensure that a message has been properly sent or retrieved and then processed. Brokers also offer various quality-of-service features, such as expiration dates and advanced routing. Depending on the broker, messages can be transformed from one representation to another, such as converting from a binary format to JSON. Some brokers also support aggregating multiple messages into one, or conversely splitting one message to produce many.

Note If you are new to ActiveMQ, I suggest reading ActiveMQ in Action by Bruce Snyder, Dejan Bosanac, and Rob Davies (Manning, 2011).

9.1.4 What is Kafka?

Apache Kafka is event-streaming middleware based on distributed logs. While that may sound complicated, all you really need to understand is that Kafka offers streams of event records, where producers can append new records and consumers can walk back and forth along streams. For instance, incoming pedometer step updates form a stream where each event is an update sent by a device, and the ingestion service produces these events. On the other hand, various consumers can look at the events on that stream to populate databases, compute statistics, and so on. Events remain in a stream for some amount of time, or until the stream is too big and has to discard its oldest records.

Kafka supports publish/subscribe interactions between distributed services, as illustrated in figure 9.2. In a Kafka cluster, events are published and consumed from topics that group related events. Topics are split into replicated partitions, which are ordered sequences of events. Each event is identified by its offset position in the event log that materializes its partition.

Figure 9.2 Overview of a Kafka topic

Consumers pull events from partitions. They can keep track of the last offset that they have consumed, but it is also possible to arbitrarily seek any random position in a partition, or even to replay all events since the beginning. Also, consumer groups can divide work by reading from different partitions and parallelizing event processing.

It is easy to think that Kafka is a messaging system like ActiveMQ, and in some cases Kafka is very fine messaging middleware, but it should still be considered streaming middleware.

In a message broker, messages disappear when they have been consumed from a queue, or when they expire. Kafka partitions eventually evict records, either using a partition size limit (such as 2 GB) or using some eviction delay (such as two weeks). Kafka records should be considered “semi-durable” as they will eventually disappear. It is possible to configure the partitions in a topic to keep events forever, but this is quite rare as events are expected to produce durable effects when consumed. For instance, the ingestion service produces incoming step update records, and the activity service turns these records into long-term facts in a database. Another interesting feature of Kakfa is that topics can be replayed at will, so new services can join and consume a stream at their own pace.

Note I suggest reading Dylan Scott’s Kafka in Action (Manning, 2017) if you are new to Apache Kafka.

Let’s now dive into the ingestion service.

9.2 Reliably ingesting messages over HTTP and AMQP

Everything begins with the ingestion service, as it receives step count updates from the pedometers. In our (fictitious) application, we can expect that several types of pedometers will be available, and that they have different communication capabilities. For example, some devices may directly talk to the ingestion service over the internet, while others may need to reach a gateway that forwards updates to the ingestion service.

This is why we offer two interfaces for ingesting device updates:

  • A device can connect to the HTTP API provided by the ingestion service.

  • A device can forward an update to a message broker, and the ingestion service receives the updates from the broker.

Once an update has been received, it must be validated and then sent to a Kafka topic. It is interesting to explore both the AMQP and HTTP interfaces, as we can see similarities in their implementations but also differences in acknowledging device updates.

9.2.1 Ingesting from AMQP

We’ll start with the AMQP ingestion. We first need to create an AMQP client that connects to the broker. The following listing shows the client configuration code.

Listing 9.1 AMQP client configuration

private AmqpClientOptions amqpConfig() {
  return new AmqpClientOptions()
    .setHost("localhost")
    .setPort(5672)
    .setUsername("artemis")                    
    .setPassword("simetraehcapa");
}
// (...)

AmqpClientOptions amqpOptions = amqpConfig();
AmqpReceiverOptions receiverOptions = new AmqpReceiverOptions()
  .setAutoAcknowledgement(false)               
  .setDurable(true);                           

The credentials are the default ones from the Docker image.

We will manually acknowledge incoming messages.

We want durable messaging.

The amqpConfig method that we use here provides a configuration with hardcoded values. This is great for the testing we do in this book, but for production you would, of course, resolve credentials, hostnames, and port numbers from some external source. These could be environment variables or a registry service, such as Apache ZooKeeper or Consul. We also set up the connection for durable messaging and declare manual acknowledgment, as we want to retry message processing if writing to a Kafka topic fails.

The next step is to set up the event-processing pipeline for incoming AMQP messages. We use RxJava to dispatch messages to a processing function, log errors, and recover from errors, as shown in the following listing.

Listing 9.2 AMQP event-processing pipeline

AmqpClient.create(vertx, amqpOptions)                                      
  .rxConnect()
  .flatMap(conn -> conn.rxCreateReceiver("step-events", receiverOptions))  
  .flatMapPublisher(AmqpReceiver::toFlowable)                              
  .doOnError(this::logAmqpError)                                           
  .retryWhen(this::retryLater)                                             
  .subscribe(this::handleAmqpMessage);                                     

Create an AMQP client.

Create a message receiver from the step-events destination.

Create a Flowable of AMQP messages.

Error logging

Retry logic

Subscription that dispatches incoming messages

This pipeline is interesting as it is purely declarative. It starts with the creation of a client, and then it obtains a receiver for the step-events durable queue and a flow of messages. From there we declare what to do upon receiving a message or an error. We also keep the code short and clean by using Java method references rather than lambdas. But what do the logAmqpError, retryLater, and handleAmqpMessage methods do?

Logging messages is not very complicated.

Listing 9.3 Logging AMQP errors

private void logAmqpError(Throwable err) {
  logger.error("Woops AMQP", err);           
}

Log the error and stack trace.

Errors happen. For instance, we can lose the connection to the AMQP broker. In this case, an error passes along the pipeline, and logAmqpError logs it, but doOnError lets the error propagate to subscribers.

We then need to retry connecting to the AMQP broker and resume receiving events, which translates to resubscribing to the source in RxJava. We can do that with the retryWhen operator, as it allows us to define our own policy. If you just want to retry a number of times, or even always, then retry is simpler. The following listing shows how we introduce a 10-second delay before resubscribing.

Listing 9.4 Recovering from errors with a delayed resubscription

private Flowable<Throwable> retryLater(Flowable<Throwable> errs) {
  return errs.delay(10, TimeUnit.SECONDS, RxHelper.scheduler(vertx));   
}

It is important to use the scheduler parameter to process events on a Vert.x event loop.

The retryLater operator works as follows:

  • It takes a Flowable of errors as its input, since we are in a Flowable of AMQP messages.

  • It returns a Flowable of anything, where

    • Emitting onComplete or onError does not trigger a resubscription.
    • Emitting onNext (no matter what the value is) triggers a resubscription.

To delay the resubscription by 10 seconds, we use the delay operator. It will eventually emit a value, so onNext will be called and a resubscription will happen. You can, of course, think of more elaborate handlers, like limiting the number of retries or using an exponential back-off strategy. We will use this pattern a lot, as it greatly simplifies the error-recovery logic.

9.2.2 Translating AMQP messages to Kafka records

The following listing contains the method that handles incoming AMQP messages, validates them, and then pushes them as Kafka records.

Listing 9.5 Handling AMQP messages

private void handleAmqpMessage(AmqpMessage message) {
  if (!"application/json".equals(message.contentType()) || 
   invalidIngestedJson(message.bodyAsJsonObject())) {                     
    logger.error("Invalid AMQP message (discarded): {}", 
     message.bodyAsBinary());
    message.accepted();
    return;
  }
  JsonObject payload = message.bodyAsJsonObject();
  KafkaProducerRecord<String, JsonObject> record = makeKafkaRecord(payload);
  updateProducer.rxSend(record).subscribe(
    ok -> message.accepted(),                                               
    err -> {
      logger.error("AMQP ingestion failed", err);
      message.rejected();                                                   
    });
}

Check for a valid JSON message.

Prepare a Kafka record.

Acknowledge the AMQP message.

Reject the AMQP message.

The handleAmqpMessage method first performs some validation on the incoming AMQP message and then prepares a Kafka record. The AMQP message is acknowledged when the Kafka record is written, and it is rejected if the record could not be written.

tip In listing 9.5 and all subsequent services, we will directly work with JsonObject data representation. There is little point in converting the JSON representation to Java domain classes (such as an IngestionData class) given that we mostly copy and transform data. You can, of course, perform such mapping if you have to do some more complex business logic and the cost of abstraction is justified.

The invalidIngestedJson method checks that the JSON data contains all required entries, as follows.

Listing 9.6 Checking for valid JSON data

private boolean invalidIngestedJson(JsonObject payload) {
  return !payload.containsKey("deviceId") ||                
    !payload.containsKey("deviceSync") ||
    !payload.containsKey("stepsCount");
}

Checking for JSON entries

The makeKafkaRecord method in the following listing converts the AMQP message JSON to a Kafka record aimed at the incoming-steps topic.

Listing 9.7 Preparing a Kafka record

private KafkaProducerRecord<String, JsonObject> makeKafkaRecord(JsonObject 
 payload) {
  String deviceId = payload.getString("deviceId");
  JsonObject recordData = new JsonObject()                                  
    .put("deviceId", deviceId)
    .put("deviceSync", payload.getLong("deviceSync"))
    .put("stepsCount", payload.getInteger("stepsCount"));
  return KafkaProducerRecord.create("incoming.steps", deviceId, recordData);
}

We copy JSON data.

Record with key deviceId and JSON data

We could avoid copying all JSON entries manually and just pass the JSON from the AMQP message to the Kafka record. This, however, helps ensure that no extra data ends up in the Kafka record.

The updateProducer field is of type KafkaProducer<String, JsonObject> because it produces messages with string keys and JSON payloads. Instances of KafkaProducer are created by passing configuration from a Map as follows.

Listing 9.8 Configuring a Kafka producer

Map<String, String> kafkaConfig() {
  Map<String, String> config = new HashMap<>();
  config.put("bootstrap.servers", "localhost:9092");
  config.put("key.serializer", 
   "org.apache.kafka.common.serialization.StringSerializer");      
  config.put("value.serializer", 
   "io.vertx.kafka.client.serialization.JsonObjectSerializer");    
  config.put("acks", "1");
  return config;
}
// (...)

// in rxStart()
updateProducer = KafkaProducer.create(vertx, kafkaConfig());         

Class to serialize values from strings

Class to serialize values from Vert.x JsonObject

Create a Vert.x Kafka producer.

The configuration especially specifies the serializer (or deserializer) classes, as Kafka records need to be mapped to Java types. StringSerializer comes from the Kafka client library, and it serializes Java strings to Kafka data, whereas JsonObjectSerializer comes from Vert.x and serializes JsonObject data. You need to specify correct serializer classes for both your keys and values. Similarly, you will need to configure deserializers when reading from Kafka topics.

tip The Vert.x Kafka module wraps the Java client from the Apache Kafka project, and all configuration key/value pairs match those from the Kafka Java client documentation.

9.2.3 Ingesting from HTTP

The code to ingest from HTTP is very similar to that of ingesting with AMQP. The most notable difference is that an HTTP status code needs to be set, so that the device that sent an update knows that ingestion has failed and must be retried later.

We first need an HTTP server and router.

Listing 9.9 HTTP server for ingestion

Router router = Router.router(vertx);
router.post().handler(BodyHandler.create());        
router.post("/ingest").handler(this::httpIngest);

return vertx.createHttpServer()
  .requestHandler(router)
  .rxListen(HTTP_PORT)
  .ignoreElement();

BodyHandler decodes HTTP request bodies.

The httpIngest method is shown in the next listing, and it’s quite similar to handleAmqpMessage.

Listing 9.10 Ingesting updates from HTTP

private void httpIngest(RoutingContext ctx) {
  JsonObject payload = ctx.getBodyAsJson();
  if (invalidIngestedJson(payload)) {                                    
    logger.error("Invalid HTTP JSON (discarded): {}", payload.encode());
    ctx.fail(400);                                                       
    return;
  }
  KafkaProducerRecord<String, JsonObject> record = makeKafkaRecord(payload);
  updateProducer.rxSend(record).subscribe(
    ok -> ctx.response().end(),                                          
    err -> {
      logger.error("HTTP ingestion failed", err);
      ctx.fail(500);                                                     
    });
}

Check the JSON entries.

Bad JSON; let the requester know that.

Successful ingestion

The ingestion failed; let the requester know that.

HTTP status codes are important for letting the client know if the payload is incorrect (400), if the ingestion failed due to some (temporary) error (500), or if the ingestion succeeded (200).

The ingestion service is a good example of integration using different input protocols. Let’s now explore more of Apache Kafka with Vert.x through the congratulation service.

9.3 Sending congratulation emails

While the ingestion service produces Kafka events, the congratulation service consumes Kafka events.

The activity service generates daily step events whenever a device update has been received. Each event contains the number of steps recorded for the originating device on the current day. The congratulation service can observe these events as they are sent to the daily.step.updates Kafka topic, and it can target the events where the number of steps is above 10,000.

9.3.1 Listening for daily step update events

The events sent to the daily.step.updates Kafka topic are JSON data with the following content:

  • deviceId is the device identifier.

  • timestamp is the timestamp when the event was produced in the activity service.

  • stepsCount is the number of steps for the current day.

The Kafka records also have a key, which is the concatenation of several parameters: deviceId:year-month-day. In this scheme, all records of device 1a2b produced on October 6th 2019 have the key 1a2b:2019-10-06. As you will shortly see, the key will be useful not just to ensure that events for a given device are consumed in order, but also to ensure that we don’t send more than one congratulation email per day.

The pipeline for processing daily steps event is shown in figure 9.3.

Figure 9.3 Pipeline from daily step counts to congratulation emails

Daily step updates flow from the daily.step.updates Kafka topic, and then

  1. We discard events where the number of steps is less than 10,000.

  2. We discard events for which an event with the same key has already been processed.

  3. We send an email.

The following listing contains the corresponding RxJava pipeline.

Listing 9.11 Kafka RxJava pipeline for receiving and processing daily step updates

KafkaConsumer.<String, JsonObject>create(vertx, 
 KafkaConfig.consumerConfig("congrats-service"))
  .subscribe("daily.step.updates")                            
  .toFlowable()
  .filter(this::above10k)                                     
  .distinct(KafkaConsumerRecord::key)                         
  .flatMapSingle(this::sendmail)                              
  .doOnError(err -> logger.error("Woops", err))
  .retryWhen(this::retryLater)                                
  .subscribe(mailResult -> logger.info("Congratulated {}", 
 mailResult.getRecipients()));                              

Subscribe to the Kafka topic.

Filter out events with less than 10,000 steps.

Discard events for which a previous event with the same key has been processed.

Asynchronous operation to send an email

Retry on error.

Log each successful congratulation.

The preceding listing uses the RxJava binding to subscribe to a Kafka topic as a Flowable for Kafka records. We then use the filter combinator to filter out records with less than 10,000 steps, and use the predicate method in the following listing.

Listing 9.12 Predicate for events with at least 10,000 steps

private boolean above10k(KafkaConsumerRecord<String, JsonObject> record) {
  return record.value().getInteger("stepsCount") >= 10_000;                
}

Predicate on JSON data.

The distinct combinator in listing 9.11 ensures that only one event for each Kafka record key is retained, right after filter. This is to avoid sending more than one congratulation email to a user on a given day, as we could easily have a first event with, say, 10,100 steps, followed later by another event with 10,600 steps, and so on. Note that this design is not 100% bulletproof, as it requires storing already-processed key values in memory, and upon a service restart we could accidentally send a second email. This is a reasonable trade-off in our example, compared to using a persistent data store just to keep track of when an email was last sent to a user.

The rest of the pipeline uses similar event processing and retryWhen logic to resubscribe on errors. The sendmail method is an asynchronous operation to send an email--let’s look at how it works.

9.3.2 Sending emails

The vertx-mail-client module offers an SMTP client. The following listing shows how to create such a client.

Listing 9.13 Creating an SMTP client

MailClient mailClient = MailClient.createShared(vertx, MailerConfig.config());

Create a shared instance

As with many other Vert.x clients, we obtain an instance through a factory method, passing a Vertx context as well as some parameters.

The MailerConfig class provides a method to retrieve configuration data, as shown next.

Listing 9.14 Mail client configuration

class MailerConfig {
  static MailConfig config() {
    return new MailConfig()
      .setHostname("localhost")    
      .setPort(1025);              
  }
}

Server host

Server port

Again, these hardcoded values are fine for testing purposes and for keeping our code simple. The values are for connecting to MailHog, the testing SMTP server that we’re using from a Docker container. The MailConfig class supports more configuration options like SSL, authentication method, credentials, and so on.

A daily-steps update Kafka event applies to a device; it does not contain the name of the owner or the email address. Before we can send an email, we must first fetch the missing information (name and email) from the user profile service. We thus need two requests to that service:

  • A request of the form /owns/deviceId to get the user name

  • A request of the form /username to get the user profile and retrieve the email address

The sendmail method is shown in the following listing.

Listing 9.15 Implementation of the sendmail method

private Single<MailResult> sendmail(KafkaConsumerRecord<String, JsonObject> 
 record) {
  String deviceId = record.value().getString("deviceId");          
  Integer stepsCount = record.value().getInteger("stepsCount");
  return webClient
    .get(3000, "localhost", "/owns/" + deviceId)                   
    .as(BodyCodec.jsonObject())
    .rxSend()
    .map(HttpResponse::body)                                       
    .map(json -> json.getString("username"))                       
    .flatMap(this::getEmail)                                       
    .map(email -> makeEmail(stepsCount, email))                    
    .flatMap(mailClient::rxSendMail);                              
}

Extract the device identifier.

Prepare a request to find who owns the device.

Extract the body, which is a JsonObject.

Extract the username value.

Asynchronous operation to fetch the email for the user

Prepare an email message.

Asynchronously send the email.

The sendmail method is another RxJava pipeline that composes asynchronous operations and data processing, illustrated in figure 9.4.

Figure 9.4 Asynchronous operations to prepare and then send a congratulation email

It starts by issuing an HTTP request to the user profile service and finding the user name of the device owner. It then prepares another request to fetch the user profile data to get the email address. The following listing provides the implementation of the getEmail method.

Listing 9.16 Request to retrieve the email address

private Single<String> getEmail(String username) {
  return webClient
    .get(3000, "localhost", "/" + username)
    .as(BodyCodec.jsonObject())
    .rxSend()                                
    .map(HttpResponse::body)
    .map(json -> json.getString("email"));   
}

Send the request.

Keep only the email address.

The next step is to prepare an email, enclosed in a MailMessage instance, as shown in the following implementation of the makeEmail method.

Listing 9.17 Preparing an email message

private MailMessage makeEmail(Integer stepsCount, String email) {
  return new MailMessage()
    .setFrom("[email protected]")                           
    .setTo(email)                                               
    .setSubject("You made it!")                                 
    .setText("Congratulations on reaching " + stepsCount + " 
     steps today!

- The 10k Steps Team
");                
}

Address of the sender

Recipient address

Subject

Body

Note that for more advanced email formatting, you could use a template engine rather than text.

Now that you know how to do messaging and event streaming with Vert.x, let’s not forget integration testing, to ensure that both the ingestion and congratulation services work correctly.

9.4 Integration tests

Testing the ingestion service involves sending device updates over AMQP and HTTP, and observing the Kafka topics. Conversely, testing the congratulation service involves sending events to Kafka topics, and observing the emails.

9.4.1 Ingestion testing

Testing the ingestion service requires sending a message over AMQP or HTTP, and then checking that a Kafka record has been emitted, as shown in figure 9.5.

Figure 9.5 Ingestion integration-test overview

The IntegrationTest class in the ingestion service source code uses JUnit 5 and Docker containers to start an AMQP broker, Apache Kafka, and Apache ZooKeeper. The following listing shows the test preparation.

Listing 9.18 Ingestion test preparation

@BeforeEach
void setup(Vertx vertx, VertxTestContext testContext) {
  kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfig());          
  amqpClient = AmqpClient.create(vertx, amqClientOptions());          
  KafkaAdminClient adminClient = KafkaAdminClient.create(vertx, 
   kafkaConfig());                                                   
  vertx
    .rxDeployVerticle(new IngesterVerticle())                          
    .delay(500, TimeUnit.MILLISECONDS, RxHelper.scheduler(vertx))
    .flatMapCompletable(id -> 
     adminClient.rxDeleteTopics(singletonList("incoming.steps")))    
    .onErrorComplete()
    .subscribe(testContext::completeNow, testContext::failNow);
}

Kafka consumer

AMQP client

Client to administer Kafka

Deploy the ingestion verticle.

Delete all incoming.steps topics if they exist.

The preparation consists of deploying the IngesterVerticle verticle, and then deleting any existing incoming.steps topic. This ensures that tests do not pollute each other with remaining Kafka events. Note the onErrorComplete combinator: it ensures progress, because deleting topics raises an error when they don’t exist. We want to run the tests when incoming.steps does not exist, which is typically the case of the first test being run. Of course, onErrorComplete can mask a deployment failure of IngesterVerticle, but we will find that out in test executions.

The following listing shows the preamble of the test case where a well-formed AMQP message is being ingested.

Listing 9.19 AMQP ingestion test preamble

@Test
@DisplayName("Ingest a well-formed AMQP message")
void amqIngest(VertxTestContext testContext) {
  JsonObject body = new JsonObject().put("deviceId", "123")
    .put("deviceSync", 1L).put("stepsCount", 500);
  amqpClient.rxConnect()                                              
    .flatMap(connection -> connection.rxCreateSender("step-events"))  
    .subscribe(sender -> {
        AmqpMessage msg = AmqpMessage.create()                        
          .durable(true)
          .ttl(5000)
          .withJsonObjectAsBody(body).build();
        sender.send(msg);                                             
      },
      testContext::failNow);
  // (...)
}

Open an AMQP client connection.

Create a sender to the step-events destination.

Create an AMQP message.

Send the message.

The AMQP client sends a message that we know is well-formed, as its body contains all the required JSON entries.

Once this is done, we need to check that a Kafka record has been sent, as follows.

Listing 9.20 AMQP ingestion test: checking for a Kafka record

kafkaConsumer.subscribe("incoming.steps")                      
  .toFlowable()
  .subscribe(
    record -> testContext.verify(() -> {                       
      assertThat(record.key()).isEqualTo("123");
      JsonObject json = record.value();
      assertThat(json.getString("deviceId")).isEqualTo("123");
      assertThat(json.getLong("deviceSync")).isEqualTo(1L);
      assertThat(json.getInteger("stepsCount")).isEqualTo(500);
      testContext.completeNow();                               
    }),
    testContext::failNow);                                     

Subscribe to the Kafka topic.

Perform assertions on the Kafka record.

The test passes.

Fail the test on any error.

Of course, we also need to test what happens when an incorrect message is sent, like an empty JSON document. We must check that no Kafka record is being emitted, as in the following listing.

Listing 9.21 Ingesting a bad JSON document

@Test
@DisplayName("Ingest a badly-formed AMQP message and observe no Kafka record")
void amqIngestWrong(Vertx vertx, VertxTestContext testContext) {
  JsonObject body = new JsonObject();                             
  // (...)                                                        

  kafkaConsumer.subscribe("incoming.steps")
    .toFlowable()
    .timeout(3, TimeUnit.SECONDS, RxHelper.scheduler(vertx))      
    .subscribe(
      record -> testContext.failNow(new 
       IllegalStateException("We must not get a record")),
      err -> {
        if (err instanceof TimeoutException) {                    
          testContext.completeNow();
        } else {
          testContext.failNow(err);
        }
      });
}

Empty JSON

Send it (same code as in listing 9.20)

Wait for three seconds.

Check that this is the error we expected!

The timeout in the RxJava pipeline is important, as we need to let some time lapse to be sure that no Kafka record has been sent. The remainder of the IntegrationTest class is quite similar, with two test cases for the HTTP ingestion: one that checks what happens when a correct payload is sent, and one where the payload is an empty JSON document.

9.4.2 Congratulation email testing

Testing the behavior of the congratulation service is more involved than the ingestion, as there are more moving parts in the test environment, as illustrated in figure 9.6.

Figure 9.6 Congratulation service integration-test overview

The goal is to send Kafka records and then observe the emails that have been sent (or not sent). Interestingly, MailHog is not just an SMTP server; it also provides a web interface and an HTTP API to simulate an email inbox. This allows us to perform tests by sending Kafka records, and then checking what emails have been received in the inbox.

The CongratsTest class features a prepare initialization method that creates a Kafka producer (to send Kafka events) and a Vert.x web client (to query the inbox). The steps in the prepare method to prepare the environment are shown in the following listing.

Listing 9.22 Preparing the congratulation service integration test

KafkaAdminClient adminClient = KafkaAdminClient.create(vertx, conf);
adminClient
  .rxDeleteTopics(Arrays.asList("incoming.steps", "daily.step.updates"))    
  .onErrorComplete()
  .andThen(vertx.rxDeployVerticle(new CongratsVerticle()))                  
  .ignoreElement()
  .andThen(vertx.rxDeployVerticle(new FakeUserService()))                   
  .ignoreElement()
  .andThen(webClient.delete(8025, "localhost", "/api/v1/messages").rxSend())
  .ignoreElement()
  .subscribe(testContext::completeNow, testContext::failNow);

Delete Kafka topics.

Deploy the verticle.

Deploy a mock user account service.

Delete all messages from the inbox.

We first delete existing Kafka topics, and then we deploy the verticle under test. We also deploy a verticle to mock the user profile service and delete all messages from the inbox by making an HTTP DELETE query to the MailHog instance.

The FakeUserService verticle found in the test source exposes an HTTP service with the minimal level of functionality to replace the real user profile service in our tests. All requests to find out who owns a device point to user Foo, and retrieving the details of user Foo gives just the username and email. The following listing shows an excerpt with the code for answering a user details request with information for user Foo and just the JSON entries needed for CongratsVerticle to operate.

Listing 9.23 Excerpt from the FakeUserService class

router.get("/:username").handler(this::username);             
//(...)

private void username(RoutingContext ctx) {
  logger.info("User data request {}", ctx.request().path());
  JsonObject notAllData = new JsonObject()                    
    .put("username", "Foo")
    .put("email", "[email protected]");
  ctx.response()
    .putHeader("Content-Type", "application/json")
    .end(notAllData.encode());
}

Route for a user profile info

JSON with just the required data for the service and test

This way we have good isolation of the congratulation service for testing. We could also have deployed the real user profile service, but that would have involved preparing a database with some data. It is always better to replace dependent services with mock ones when you can.

The next listing shows the full test case for checking that no email is sent on a Kafka record with less than 10,000 steps.

Listing 9.24 Checking that no mail has been sent for less than 10,000 steps

@Test
@DisplayName("No email must be sent below 10k steps")
void checkNothingBelow10k(Vertx vertx, VertxTestContext testContext) {
  producer
    .rxSend(record("123", 5000))                                           
    .ignoreElement()
    .delay(3, TimeUnit.SECONDS, RxHelper.scheduler(vertx))                 
    .andThen(webClient
      .get(8025, "localhost", "/api/v2/search?kind=to&[email protected]") 
      .as(BodyCodec.jsonObject()).rxSend())
    .map(HttpResponse::body)
    .subscribe(
      json -> {
        testContext.verify(() -> 
         assertThat(json.getInteger("total")).isEqualTo(0));             
        testContext.completeNow();
      },
      testContext::failNow);
}

Kafka record for device 123 and 5000 steps

Wait for three seconds after the message has been sent.

Query all messages for email [email protected].

Check that there is no message.

The MailHog API allows us to check what messages have been sent. The next listing checks whether an email was sent for more than 10,000 steps.

Listing 9.25 Checking whether an email was sent for more than 10,000 steps

producer
  .rxSend(record("123", 11_000))                                 
  .ignoreElement()
  .delay(3, TimeUnit.SECONDS, RxHelper.scheduler(vertx))
  .andThen(webClient
    .get(8025, "localhost", "/api/v2/search?kind=to&[email protected]")
    .as(BodyCodec.jsonObject()).rxSend())
  .map(HttpResponse::body)
  .subscribe(
    json -> {
      testContext.verify(() -> 
       assertThat(json.getInteger("total")).isEqualTo(1));     
      testContext.completeNow();
    },
    testContext::failNow);

A record with 11,000 steps

We must have one message.

The last test case in the checkNotTwiceToday method checks that only one email was sent for two successive records with more than 10,000 steps. I haven’t reproduced the code here due to its verbosity, but you can get it from the book’s source code repository.

This concludes the design, implementation, and testing of two services that use messaging and event streaming. The next chapter focuses on Vert.x and data sources.

Summary

  • AMQP is a standard protocol for message brokers, and you saw how to consume and produce AQMP messages with Vert.x and Apache ActiveMQ.

  • Apache Kafka is event-streaming middleware that allows services to replay events at will. Vert.x provides efficient integration with Kafka.

  • RxJava allows you to write event-processing pipelines in a declarative fashion, and with built-in error recovery.

  • We explored strategies for writing integration tests with AMQP, Kafka, and test containers by sending messages from tests to replace external components.

  • MailHog is a test-friendly SMTP server that exposes a convenient API for inspecting what emails have been sent.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.58.121.131