Chapter 3. Building Reactive Microservices

In this chapter, we will build our first microservices with Vert.x. As most microservice systems use HTTP interactions, we are going to start with HTTP microservices. But because systems consist of multiple communicating microservices, we will build another microservice that consumes the first one. Then, we will demonstrate why such a design does not completely embrace reactive microservices. Finally, we will implement message-based microservices to see how messaging improves the reactiveness.

First Microservices

In this chapter we are going to implement the same set of microservices twice. The first microservice exposes a hello service that we will call hello microservice. Another consumes this service twice (concurrently). The consumer will be called hello consumer microservice. This small system illustrates not only how a service is served, but also how it is consumed. On the left side of Figure 3-1, the microservices are using HTTP interactions. The hello consumer microservice uses an HTTP client to invoke the hello microservice. On the right side, the hello consumer microservice uses messages to interact with the hello microservice. This difference impacts the reactiveness of the system.

brmj 0301
Figure 3-1. The microservices implemented in this chapter using HTTP and message-based interactions

In the previous chapter, we saw two different ways to use Vert.x APIs: callbacks and RxJava. To illustrate the differences and help you find your preferred approach, the hello microservices are implemented using the callback-based development model, while the consumers are implemented using RxJava.

Implementing HTTP Microservices

Microservices often expose their API via HTTP and are consumed using HTTP requests. Let’s see how these HTTP interactions can be implemented with Vert.x. The code developed in this section is available in the microservices/hello-microservice-http directory of the code repository.

Getting Started

Create a directory called hello-microservice-http and then generate the project structure:

mkdir hello-microservice-http
cd hello-microservice-http

mvn io.fabric8:vertx-maven-plugin:1.0.5:setup 
  -DprojectGroupId=io.vertx.microservice 
  -DprojectArtifactId=hello-microservice-http 
  -Dverticle=io.vertx.book.http.HelloMicroservice 
  -Ddependencies=web

This command generates the Maven project and configures the Vert.x Maven plug-in. In addition, it adds the vertx-web dependency. Vert.x Web is a module that provides everything you need to build modern web applications on top of Vert.x.

The Verticle

Open src/main/java/io/vertx/book/http/HelloMicroservice.java. The generated code of the verticle does nothing very interesting, but it’s a starting point:

package io.vertx.book.http;

import io.vertx.core.AbstractVerticle;

public class HelloMicroservice extends AbstractVerticle {

    @Override
    public void start() {

    }
}

Now, launch the following Maven command:

mvn compile vertx:run

You can now edit the verticle. Every time you save the file, the application will be recompiled and restarted automatically.

HTTP Microservice

It’s time to make our MyVerticle class do something. Let’s start with an HTTP server. As seen in the previous chapter, to create an HTTP server with Vert.x you just use:

@Override
public void start() {
    vertx.createHttpServer()
        .requestHandler(req -> req.response()
          .end("hello"))
        .listen(8080);
}

Once added and saved, you should be able to see hello at http://localhost:8080 in a browser. This code creates an HTTP server on port 8080 and registers a requestHandler that is invoked on each incoming HTTP request. For now, we just write hello in the response.

Using Routes and Parameters

Many services are invoked through web URLs, so checking the path is crucial to knowing what the request is asking for. However, doing path checking in the requestHandler to implement different actions can get complicated. Fortunately, Vert.x Web provides a Router on which we can register Routes. Routes are the mechanism by which Vert.x Web checks the path and invokes the associated action. Let’s rewrite the start method, with two routes:

@Override
public void start() {
    Router router = Router.router(vertx);
    router.get("/").handler(rc -> rc.response().end("hello"));
    router.get("/:name").handler(rc -> rc.response()
        .end("hello " + rc.pathParam("name")));

    vertx.createHttpServer()
        .requestHandler(router::accept)
        .listen(8080);
}

Once we have created the Router object, we register two routes. The first one handles requests on / and just writes hello. The second route has a path parameter (:name). The handler appends the passed value to the greeting message. Finally, we change the requestHandler of the HTTP server to use the accept method of the router.

If you didn’t stop the vertx:run execution, you should be able to open a browser to:

Producing JSON

JSON is often used in microservices. Let’s modify the previous class to produce JSON payloads:

@Override
public void start() {
    Router router = Router.router(vertx);
    router.get("/").handler(this::hello);
    router.get("/:name").handler(this::hello);
    vertx.createHttpServer()
        .requestHandler(router::accept)
        .listen(8080);
}

private void hello(RoutingContext rc) {
    String message = "hello";
    if (rc.pathParam("name") != null) {
        message += " " + rc.pathParam("name");
    }
    JsonObject json = new JsonObject().put("message", message);
    rc.response()
        .putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
        .end(json.encode());
}

Vert.x provides a JsonObject class to create and manipulate JSON structures. With this code in place, you should be able to open a browser to:

Packaging and Running

Stop the vertx:run execution using Ctrl+C and execute the following command from the same directory:

mvn package

This produces a fat jar in the target directory: hello-microservice-http-1.0-SNAPSHOT.jar. While fat jars tend to be fat, here the JAR has a reasonable size (~6.3 MB) and contains everything to run the application:

java -jar target/hello-microservice-http-1.0-SNAPSHOT.jar

You can check to make sure it is running by opening: http://localhost:8080. Keep the process running as the next microservice will invoke it.

Consuming HTTP Microservices

One microservice does not form an application; you need a system of microservices. Now that we have our first microservice running, let’s write a second microservice to consume it. This second microservice also provides an HTTP facade to invoke it, and on each invocation calls the microservice we just implemented. The code shown in this section is available in the microservices/hello-consumer-microservice-http directory of the code repository.

Project Creation

As usual, let’s create a new project:

mkdir hello-consumer-microservice-http
cd hello-consumer-microservice-http

mvn io.fabric8:vertx-maven-plugin:1.0.5:setup 
  -DprojectGroupId=io.vertx.microservice 
  -DprojectArtifactId=hello-consumer-microservice-http 
  -Dverticle=io.vertx.book.http.HelloConsumerMicroservice 
  -Ddependencies=web,web-client,rx

The last command adds another dependency: the Vert.x web client, an asynchronous HTTP client. We will use this client to call the first microservice. The command has also added the Vert.x RxJava binding we are going to use later.

Now edit the src/main/java/io/vertx/book/http/HelloConsumerMicroservice.java file and update it to contain:

package io.vertx.book.http;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.*;
import io.vertx.ext.web.client.*;
import io.vertx.ext.web.codec.BodyCodec;

public class HelloConsumerMicroservice extends AbstractVerticle {

    private WebClient client;

    @Override
    public void start() {
        client = WebClient.create(vertx);

        Router router = Router.router(vertx);
        router.get("/").handler(this::invokeMyFirstMicroservice);

        vertx.createHttpServer()
            .requestHandler(router::accept)
            .listen(8081);
    }

    private void invokeMyFirstMicroservice(RoutingContext rc) {
        HttpRequest<JsonObject> request = client
            .get(8080, "localhost","/vert.x")
            .as(BodyCodec.jsonObject());

        request.send(ar -> {
            if (ar.failed()) {
                rc.fail(ar.cause());
            } else {
                rc.response().end(ar.result().body().encode());
            }
        });
    }

}

In the start method, we create a WebClient and a Router. On the created router, we register a route on “/” and start the HTTP server, passing the router accept method as requestHandler. The handler of the route is a method reference (hello). This method uses the web client to invoke the first microservice with a specific path (/vert.x) and write the result to the HTTP response.

Once the HTTP request is created, we call send to emit the request. The handler we passed in is invoked when either the response arrives or an error occurs. The if-else block checks to see whether the invocation has succeeded. Don’t forget that it’s a remote interaction and has many reasons to fail. For instance, the first microservice may not be running. When it succeeds, we write the received payload to the response; otherwise, we reply with a 500 response.

Calling the Service More Than Once

Now let’s change the current behavior to call the hello microservice twice with two different (path) parameters:

HttpRequest<JsonObject> request1 = client
    .get(8080, "localhost", "/Luke")
    .as(BodyCodec.jsonObject());
HttpRequest<JsonObject> request2 = client
    .get(8080, "localhost", "/Leia")
    .as(BodyCodec.jsonObject());

These two requests are independent and can be executed concurrently. But here we want to write a response assembling both results. The code required to invoke the service twice and assemble the two results can become convoluted. We need to check to see whether or not the other request has been completed when we receive one of the responses. While this code would still be manageable for two requests, it becomes overly complex when we need to handle more. Fortunately, as noted in the previous chapter, we can use reactive programming and RxJava to simplify this code.

We instruct the vertx-maven-plugin to import the Vert.x RxJava API. In the HelloConsumerMicroservice, we replace the import statements with:

import io.vertx.core.json.JsonObject;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.ext.web.*;
import io.vertx.rxjava.ext.web.client.*;
import io.vertx.rxjava.ext.web.codec.BodyCodec;
import rx.Single;

With RX, the complex code we would have written to call the two requests and build a response out of them becomes much simpler:

private void invokeMyFirstMicroservice(RoutingContext rc) {
    HttpRequest<JsonObject> request1 = client
        .get(8080, "localhost", "/Luke")
        .as(BodyCodec.jsonObject());
    HttpRequest<JsonObject> request2 = client
        .get(8080, "localhost", "/Leia")
        .as(BodyCodec.jsonObject());
    Single<JsonObject> s1 = request1.rxSend()
      .map(HttpResponse::body);
    Single<JsonObject> s2 = request2.rxSend()
      .map(HttpResponse::body);
    Single
        .zip(s1, s2, (luke, leia) -> {
            // We have the results of both requests in Luke and Leia
            return new JsonObject()
                .put("Luke", luke.getString("message"))
                .put("Leia", leia.getString("message"));
        })
        .subscribe(
            result -> rc.response().end(result.encodePrettily()),
            error -> {
              error.printStackTrace();
              rc.response()
                .setStatusCode(500).end(error.getMessage());
            }
        );
}

Notice the rxSend method calls. The RxJava methods from Vert.x are prefixed with rx to be easily recognizable. The result of rxSend is a Single, i.e., an observable of one element representing the deferred result of an operation. The single.zip method takes as input a set of Single, and once all of them have received their value, calls a function with the results. Single.zip produces another Single containing the result of the function. Finally, we subscribe. This method takes two functions as parameters:

  1. The first one is called with the result of the zip function (a JSON object). We write the receive JSON payload into the HTTP response.

  2. The second one is called if something fails (timeout, exception, etc.). In this case, we respond with an empty JSON object.

With this code in place, if we open http://localhost:8081 and the hello microservice is still running we should see:

{
  "Luke" : "hello Luke",
  "Leia" : "hello Leia"
}

Are These Microservices Reactive Microservices?

At this point we have two microservices. They are independent and can be deployed and updated at their own pace. They also interact using a lightweight protocol (HTTP). But are they reactive microservices? No, they are not. Remember, to be called reactive a microservice must be:

  • Autonomous

  • Asynchronous

  • Resilient

  • Elastic

The main issue with the current design is the tight coupling between the two microservices. The web client is configured to target the first microservice explicitly. If the first microservice fails, we won’t be able to recover by calling another one. If we are under load, creating a new instance of the hello microservice won’t help us. Thanks to the Vert.x web client, the interactions are asynchronous. However, as we don’t use a virtual address (destination) to invoke the microservice, but its direct URL, it does not provide the resilience and elasticity we need.

It’s important to note that using reactive programming as in the second microservice does not give you the reactive system’s benefits. It provides an elegant development model to coordinate asynchronous actions, but it does not provide the resilience and elasticity we need.

Can we use HTTP for reactive microservices? Yes. But this requires some infrastructure to route virtual URLs to a set of services. We also need to implement a load-balancing strategy to provide elasticity and health-check support to improve resilience.

Don’t be disappointed. In the next section we will take a big step toward reactive microservices.

The Vert.x Event Bus—A Messaging Backbone

Vert.x offers an event bus allowing the different components of an application to interact using messages. Messages are sent to addresses and have a set of headers and a body. An address is an opaque string representing a destination. Message consumers register themselves to addresses to receive the messages. The event bus is also clustered, meaning it can dispatch messages over the network between distributed senders and consumers. By starting a Vert.x application in cluster mode, nodes are connected to enable shared data structure, hard-stop failure detection, and load-balancing group communication. The event bus can dispatch messages among all the nodes in the cluster. To create such a clustered configuration, you can use Apache Ignite, Apache Zookeeper, Infinispan, or Hazelcast. In this report, we are going to use Infinispan, but we won’t go into advanced configuration. For that, refer to the Infinispan documentation (http://infinispan.org/). While Infinispan (or the technology you choose) manages the node discovery and inventory, the event bus communication uses direct peer-to-peer TCP connections.

The event bus provides three types of delivery semantics. First, the send method allows a component to send a message to an address. A single consumer is going to receive the message. If more than one consumer is registered on this address, Vert.x applies a round-robin strategy to select a consumer:

// Consumer
vertx.eventBus().consumer("address", message -> {
    System.out.println("Received: '" + message.body() + "'");
});
// Sender
vertx.eventBus().send("address", "hello");

In contrast to send, you can use the publish method to deliver the message to all consumers registered on the address. Finally, the send method can be used with a reply handler. This request/response mechanism allows implementing message-based asynchronous interactions between two components:

// Consumer
vertx.eventBus().consumer("address", message -> {
    message.reply("pong");
});
// Sender
vertx.eventBus().send("address", "ping", reply -> {
    if (reply.succeeded()) {
        System.out.println("Received: " + reply.result().body());
    } else {
        // No reply or failure
        reply.cause().printStackTrace();
    }
});

If you are using Rx-ified APIs, you can use the rxSend method, which returns a Single. This Single receives a value when the reply is received. We are going to see this method in action shortly.

Message-Based Microservices

Let’s reimplement the hello microservice, this time using an event bus instead of an HTTP server to receive the request. The microservice replied to the message to provide the response.

Project Creation

Let’s create a new project. This time we are going to add the Infinispan dependency, an in-memory data grid that will be used to manage the cluster:

mkdir hello-microservice-message
cd hello-microservice-message

mvn io.fabric8:vertx-maven-plugin:1.0.5:setup 
  -DprojectGroupId=io.vertx.microservice 
  -DprojectArtifactId=hello-microservice-message 
  -Dverticle=io.vertx.book.message.HelloMicroservice 
  -Ddependencies=infinispan

Once generated, we may need to configure Infinispan to build the cluster. The default configuration uses multicast to discover the nodes. If your network supports multicast, it should be fine. Otherwise, check the resource/cluster directory of the code repository.

Writing the Message-Driven Verticle

Edit the src/main/java/io/vertx/book/message/HelloMicroservice.java file and update the start method to be:

@Override
public void start() {
    // Receive message from the address 'hello'
    vertx.eventBus().<String>consumer("hello", message -> {
        JsonObject json = new JsonObject()
            .put("served-by", this.toString());
        // Check whether we have received a payload in the
        // incoming message
        if (message.body().isEmpty()) {
            message.reply(json.put("message", "hello"));
        } else {
            message.reply(json.put("message",
              "hello " + message.body()));
        }
    });
}

This code retrieves the eventBus from the vertx object and registers a consumer on the address hello. When a message is received, it replies to it. Depending on whether or not the incoming message has an empty body, we compute a different response. As in the example in the previous chapter, we send a JSON object back. You may be wondering why we added the served-by entry in the JSON. You’ll see why very soon. Now that the verticle is written, it’s time to launch it with:

mvn compile vertx:run 
  -Dvertx.runArgs="-cluster -Djava.net.preferIPv4Stack=true"

The -cluster tells Vert.x to start in cluster mode.

Now let’s write a microservice consuming this service.

Initiating Message-Based Interactions

In this section, we will create another microservice to invoke the hello microservice by sending a message to the hello address and get a reply. The microservice will reimplement the same logic as in the previous chapter and invoke the service twice (once with Luke and once with Leia).

As usual, let’s create a new project:

mkdir hello-consumer-microservice-message
cd hello-consumer-microservice-message

mvn io.fabric8:vertx-maven-plugin:1.0.5:setup 
  -DprojectGroupId=io.vertx.microservice 
  -DprojectArtifactId=hello-consumer-microservice-message 
  -Dverticle=io.vertx.book.message.HelloConsumerMicroservice 
  -Ddependencies=infinispan,rx

Here we also add the Vert.x RxJava support to benefit from the RX-ified APIs provided by Vert.x. If you updated the Infinispan configuration in the previous section, you need to copy it to this new project.

Now edit the io.vertx.book.message.HelloConsumerMicroservice. Since we are going to use RxJava, change the import statement to match io.vertx.rxjava.core.AbstractVerticle. Then implement the start method with:

@Override
public void start() {
  EventBus bus = vertx.eventBus();
  Single<JsonObject> obs1 = bus
    .<JsonObject>rxSend("hello", "Luke")
    .map(Message::body);
  Single<JsonObject> obs2 = bus
    .<JsonObject>rxSend("hello", "Leia")
    .map(Message::body);

  Single
    .zip(obs1, obs2, (luke, leia) ->
      new JsonObject()
        .put("Luke", luke.getString("message"))
        .put("Leia", leia.getString("message"))
    )
    .subscribe(
      x -> System.out.println(x.encode()),
      Throwable::printStackTrace);
}

This code is very similar to the code from the previous chapter. Instead of using a WebClient to invoke an HTTP endpoint, we will use the event bus to send a message to the hello address and extract the body of the reply. We use the zip operation to retrieve the two responses and build the final result. In the subscribe method, we print the final result to the console or print the stack trace.

Let’s combine this with an HTTP server. When an HTTP request is received, we invoke the hello service twice and return the built result as a response:

@Override
public void start() {
  vertx.createHttpServer()
    .requestHandler(
      req -> {
        EventBus bus = vertx.eventBus();
        Single<JsonObject> obs1 = bus
          .<JsonObject>rxSend("hello", "Luke")
          .map(Message::body);
        Single<JsonObject> obs2 = bus
          .<JsonObject>rxSend("hello", "Leia")
          .map(Message::body);

        Single
          .zip(obs1, obs2, (luke, leia) ->
            new JsonObject()
              .put("Luke", luke.getString("message")
                + " from "
                + luke.getString("served-by"))
              .put("Leia", leia.getString("message")
                + " from "
                + leia.getString("served-by"))
          )
          .subscribe(
            x -> req.response().end(x.encodePrettily()),
            t -> {
              t.printStackTrace();
              req.response().setStatusCode(500)
                .end(t.getMessage());
            }
          );
      })
    .listen(8082);
}

The last code just wraps the event bus interactions into a requestHandler and deals with the HTTP response. In case of failure, we return a JSON object containing an error message.

If you run this code with mvn compile vertx:run -Dvertx.runArgs="-cluster -Djava.net.preferIPv4Stack=true" and open your browser to http://localhost:8082, you should see something like:

{
  "Luke" : "hello Luke from ...HelloMicroservice@39721ab",
  "Leia" : "hello Leia from ...HelloMicroservice@39721ab"
}

Are We Reactive Now?

The code is very close to the HTTP-based microservice we wrote previously. The only difference is we used an event bus instead of HTTP. Does this change our reactiveness? It does! Let’s see why.

Elasticity

Elasticity is one of the characteristics not enforced by the HTTP version of the microservice. Because the microservice was targeting a specific instance of the microservice (using a hard-coded URL), it didn’t provide the elasticity we need. But now that we are using messages sent to an address, this changes the game. Let’s see how this microservice system behaves.

Remember the output of the previous execution. The returned JSON objects display the verticle having computed the hello message. The output always displays the same verticle. The message was indicating the same instance. We expected this because we had a single instance running. Now let’s see what happens with two.

Stop the vertx:run execution of the Hello microservice and run:

mvn clean package

Then, open two different terminals in the hello-microservice-message directory and issue the following command (in each terminal):

java -jar target/hello-microservice-message-1.0-SNAPSHOT.jar 
    --cluster -Djava.net.preferIPv4Stack=true

This launches two instances of the Hello microservice. Go back to your browser and refresh the page and you should see something like:

{
  "Luke" : "hello Luke from ...HelloMicroservice@16d0d069",
  "Leia" : "hello Leia from ...HelloMicroservice@411fc4f"
}

The two instances of Hello are used. The Vert.x cluster connects the different nodes, and the event bus is clustered. Thanks to the event bus round-robin, the Vert.x event bus dispatches messages to the available instances and thus balances the load among the different nodes listening to the same address.

So, by using the event bus, we have the elasticity characteristic we need.

Resilience

What about resilience? In the current code, if the hello microservice failed, we would get a failure and execute this code:

t -> {
  t.printStackTrace();
  req.response().setStatusCode(500).end(t.getMessage());
}

Even though the user gets an error message, we don’t crash, we don’t limit our scalability, and we can still handle requests. However, to improve the user experience, we should always reply in a timely fashion to the user, even if we don’t receive the responses from the service. To implement this logic, we can enhance the code with a timeout.

To illustrate this, let’s modify the Hello microservice to inject failures and misbehaviors. This code is located in the microservices/hello-microservice-faulty directory of the code repository.

This new start method randomly selects one of three strategies: (1) reply with an explicit failure, (2) forget to reply (leading to a timeout on the consumer side), or (3) send the correct result.

@Override
public void start() {
    vertx.eventBus().<String>consumer("hello", message -> {
        double chaos = Math.random();
        JsonObject json = new JsonObject()
            .put("served-by", this.toString());

        if (chaos < 0.6) {
            // Normal behavior
            if (message.body().isEmpty()) {
                message.reply(json.put("message", "hello"));
            } else {
                message.reply(json.put("message", "hello "
                  + message.body()));
            }
        } else if (chaos < 0.9) {
            System.out.println("Returning a failure");
            // Reply with a failure
            message.fail(500,
              "message processing failure");
        } else {
            System.out.println("Not replying");
            // Just do not reply, leading to a timeout on the
            // consumer side.
        }
    });
}

Repackage and restart the two instances of the Hello microservice.

With this fault injection in place, we need to improve the fault-tolerance of our consumer. Indeed, the consumer may get a timeout or receive an explicit failure. In the hello consumer microservice, change how we invoke the hello service to:

EventBus bus = vertx.eventBus();
Single<JsonObject> obs1 = bus
  .<JsonObject>rxSend("hello", "Luke")
  .subscribeOn(RxHelper.scheduler(vertx))
  .timeout(3, TimeUnit.SECONDS)
  .retry()
  .map(Message::body);
Single<JsonObject> obs2 = bus.
  <JsonObject>rxSend("hello", "Leia")
  .subscribeOn(RxHelper.scheduler(vertx))
  .timeout(3, TimeUnit.SECONDS)
  .retry()
  .map(Message::body);

This code is located in the microservices/hello-consumer-microservice-timeout directory of the code repository. The timeout method emits a failure if we don’t receive a response in the given time. The retry method reattempts to retrieve the value if it gets a failure in the form of a timeout or an explicit failure. The subscribeOn method indicates on which thread the invocations need to be done. We use the Vert.x event loop to call our callbacks. Without this, the methods would be executed by a thread from the default RxJava thread pool, breaking the Vert.x threading model. The RXHelper class is provided by Vert.x. Blindly retrying service invocations is not a very clever fault tolerance strategy. It can even be harmful. The next chapter details different approaches.

Now you can reload the page. You will always get a result, even if there are failures or timeouts. Remember that the thread is not blocked while calling the service, so you can always accept new requests and respond to them in a timely fashion. However, this timeout retry often causes more harm than good, as we will see in the next chapter.

Summary

In this section, we learned how to develop an HTTP microservice with Vert.x and also how to consume it. As we learned, hard-coding the URL of the consumed service in the code is not a brilliant idea as it breaks one of the reactive characteristics. In the second part, we replaced the HTTP interactions using messaging, which showed how messaging and the Vert.x event bus help build reactive microservices.

So, are we there yet? Yes and no. Yes, we know how to build reactive microservices, but there are a couple of shortcomings we need to look at. First, what if you only have HTTP services? How do you avoid hard-coded locations? What about resilience? We have seen timeouts and retries in this chapter, but what about circuit breakers, failovers, and bulkheads? Let’s continue the journey.

If you want to go further on these topics:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.128.78.30