AMQP fundamentals

If you've already used JMS, then you're aware that it has queues and topics. AMQP has queues as well but the semantics are different.

Each message sent by a JMS-based producer is consumed by just one of the clients of that queue. AMQP-based producers don't publish directly to queues but to exchanges instead. When queues are declared, they must be bound to an exchange. Multiple queues can be bound to the same exchange, emulating the concept of topics.

JMS has message selectors which allow consumers to be selective about the messages they receive from either queues or topics. AMQP has routing keys that behave differently based on the type of the exchange, as listed next.

A direct exchange routes messages based on a fixed routing key, often the name of the queue. For example, the last code that we just looked at mentioned learning-spring-boot as the name of exchange and comments.new as the routing key. Any consumer that binds their own queue to that exchange with a routing key of comments.new will receive a copy of each message posted earlier.

A topic exchange allows routing keys to have wildcards like comments.*. This situation best suits clients where the actual routing key isn't known until a user provides the criteria. For example, imagine a stock-trading application where the user must provide a list of ticker symbols he or she is interested in monitoring.

A fanout exchange blindly broadcasts every message to every queue that is bound to it, regardless of the routing key.

Regarding the semantics of AMQP, let's explore that further by looking at the CommentService (also in comments subpackage) in chunks:

    @Service 
    public class CommentService { 
 
      private CommentWriterRepository repository; 
 
      public CommentService(CommentWriterRepository repository) { 
        this.repository = repository; 
      }
... more to come below...
}

This preceding code can be described as follows:

  • The @Service annotation marks it as a Spring service to be registered with the application context on startup
  • CommentWriterRepository is a Spring Data repository used to write new comments and is initialized by the constructor injection

Which brings us to the meat of this service, which is as follows:

    @RabbitListener(bindings = @QueueBinding( 
      value = @Queue, 
      exchange = @Exchange(value = "learning-spring-boot"), 
      key = "comments.new" 
    )) 
    public void save(Comment newComment) { 
      repository 
       .save(newComment) 
       .log("commentService-save") 
       .subscribe(); 
    } 

This last little function packs a punch, so let's take it apart:

  • The @RabbitListener annotation is the easiest way to register methods to consume messages.
  • The @QueueBinding annotation is the easiest way to declare the queue and the exchange it's bound to on-the-fly. In this case, it creates an anonymous queue for this method and binds to the learning-spring-boot exchange.
  • The routing key for this method is comments.new, meaning any message posted to the learning-spring-boot exchange with that exact routing key will cause this method to be invoked.
  • It's possible for the @RabbitListener methods to receive a Spring AMQP Message, a Spring Messaging Message, various message headers, as well as a plain old Java object (which is what we have here).
  • The method itself invokes our CommentWriterRepository to actually save the comment in the data store.

To use RabbitMQ, we would normally need @EnableRabbit, but thanks to Spring Boot, it's automatically activated when spring-boot-starter-amqp is on the classpath. Once again, Boot knows what we want and just does it.

An important thing to understand is that @RabbitListener makes it possible to dynamically create all the exchanges and queues needed to operate. However, it only works if an instance of AmqpAdmin is in the application context. Without it, ALL exchanges and queues must be declared as separate Spring beans. But Spring Boot's RabbitMQ autoconfiguration policy provides one, so no sweat!

There is one slight issue with this method that will cause it to not operate--object serialization. If we had declared the method signature to provide us with a Spring AMQP Message object, we would pull down a byte array. However, out of the box, Spring AMQP has limited functionality in serializing custom domain objects. With no effort, it can handle simple strings and serializables.

But for custom domain objects, there is a more preferred solution--a Spring AMQP message converter, as shown next:

    @Bean 
    Jackson2JsonMessageConverter jackson2JsonMessageConverter() { 
      return new Jackson2JsonMessageConverter(); 
    } 

This preceding bean, listed right below the save(Comment newComment) method, can be described as follows:

  • @Bean registers this as a bean definition.
  • It creates Jackson2JsonMessageConverter, an implementation of Spring AMQP's MessageConverter, used to serialize and deserialize Spring AMQP Message objects. In this case, is uses Jackson to convert POJOs to/from JSON strings.

Spring Boot's RabbitMQ autoconfiguration policy will look for any implementation of Spring AMQP's MessageConverter instances and register them with both the RabbitTemplate we used earlier as well as the SimpleMessageListenerContainer that it creates when it spots @RabbitListener in our code.

To start our application with a clean slate, we have this code at the bottom of CommentService:

    @Bean 
    CommandLineRunner setUp(MongoOperations operations) { 
      return args -> { 
        operations.dropCollection(Comment.class); 
      }; 
    } 

The last code can be described as follows:

  • The @Bean annotation will register this chunk of code automatically
  • By implementing Spring Boot's CommandLineRunner interface, the Java 8 lambda expression will run itself when all beans have been created
  • It receives a copy of MongoOperations, the blocking MongoDB object we can use to drop the entire collection based on Comment
This code is handy for development, but should be either removed in production or wrapped in a @Profile("dev") annotation such that it ONLY runs when spring.profiles.active=dev is present.

To persist comments in our data store, we have the following Spring Data repository:

    public interface CommentWriterRepository 
     extends Repository<Comment, String> { 
 
       Mono<Comment> save(Comment newComment); 
 
       // Needed to support save() 
       Mono<Comment> findById(String id); 
    } 

This preceding repository isn't too difficult to dissect, and that can be done as follows:

  • It's an interface, which means that we don't have to write any code. We just declare the semantics and Spring Data does the rest.
  • By extending Spring Data Commons' Repository interface, it will be picked up as a repository. Being an empty interface, it comes with no predefined operations.
  • It contains a save() operation to store a new comment (and return it after it gets saved). If the ID value is null, Spring Data MongoDB will automatically generate a unique string value for us.
  • Spring Data requires a findOne() operation in order to perform saves because that's what it uses to fetch what we just saved in order to return it.
  • All of these method signatures use Reactor Mono types.

This repository is focused on writing data into MongoDB and nothing more. Even though it has a findOne(), it's not built for reading data. That has been kept over in the images subpackage.

To finish things up in our comments subpackage, let's look at the core domain object:

    package com.greglturnquist.learningspringboot.comments; 
 
    import lombok.Data; 
 
    import org.springframework.data.annotation.Id; 
    import org.springframework.data.mongodb.core.mapping.Document; 
 
    @Data 
    @Document 
    public class Comment { 
 
      @Id private String id; 
      private String imageId; 
      private String comment; 
    } 

This previous domain object contains the following:

  • The @Data annotation tells Lombok to generate getters, setters, toString(), equals(), and hashCode() methods
  • The id field is marked with Spring Data Common's @Id annotation so we know it's the key for mapping objects
  • The imageId field is meant to hold an Image.id field linking comments to images
  • The comment field is the place to store an actual comment
Wait a second! Isn't this the exact same code found in com.greglturnquist.learningspringboot.images.Comment? It is right now. But it's important to recognize that different slices may need different attributes in the future. By keeping a slice-specific domain object, we can change one without the risk of changing the other. In fact, it's possible that we can (spoiler alert!), later in this book, move this entire comments system into a separate microservice. By keeping things in nicely divided slices, the risk of tight coupling can be reduced.

Another factor is that RabbitMQ is not reactive. Invoking rabbitTemplate.convertAndSend() is blocking. That may sound awkward given AMQP is a pub/sub technology. But the whole process of publishing the message to the RabbitMQ broker holds up our thread, and is, by definition, blocking.

So our code wraps that inside a Java Runnable and converts it into a Mono via Reactor's Mono.fromRunnable. That makes it possible to invoke this blocking task only when we're ready at the right time. It's important to know that a Mono-wrapped-Runnable doesn't act like a traditional Java Runnable and doesn't get launched in a separate thread. Instead, the Runnable interface provides a convenient wrapper where Reactor controls precisely when the run() method is invoked inside its scheduler.

If we refresh our code in the IDE and let it restart, we can now start creating comments. Check out the following screenshot:

The preceding screenshot shows a couple of comments added to the first image and a third being written. Cool, ehh?

But perhaps, you're wondering why we spent all that effort splitting up reading and writing comments? After all, Spring Data appears to make it easy enough to define a single repository that could handle both. That may even imply we didn't need RabbitMQ and could let HomeController and CommentController use the repository directly instead.

The reason to use messaging is to provide a reliable way to offload work to another system. A real system that grows to thousands, if not millions, of users will see a huge flow of traffic. Think about it. Are there any other social media platforms where people write comments constantly but only view a handful at a time?

This facet of our application is designed with scalability in mind. If we had one million users, they may be writing tens of millions of messages a day. Hitching our controller directly to MongoDB may cause it to keel over. But if we push all the writes to a separate service, we can tune suitably.

The number of reads is much smaller.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.79.84