Streams for reactive microservices

Spring Cloud Stream provides an abstraction over the messaging infrastructure. The underlying messaging implementation can be RabbitMQ, Redis, or Kafka. Spring Cloud Stream provides a declarative approach for sending and receiving messages:

Streams for reactive microservices

As shown in the preceding diagram, Cloud Stream works on the concept of a source and a sink. The source represents the sender perspective of the messaging, and sink represents the receiver perspective of the messaging.

In the example shown in the diagram, the sender defines a logical queue called Source.OUTPUT to which the sender sends messages. The receiver defines a logical queue called Sink.INPUT from which the receiver retrieves messages. The physical binding of OUTPUT to INPUT is managed through the configuration. In this case, both link to the same physical queue—MyQueue on RabbitMQ. So, while at one end, Source.OUTPUT points to MyQueue, on the other end, Sink.INPUT points to the same MyQueue.

Spring Cloud offers the flexibility to use multiple messaging providers in one application such as connecting an input stream from Kafka to a Redis output stream, without managing the complexities. Spring Cloud Stream is the basis for message-based integration. The Cloud Stream Modules subproject is another Spring Cloud library that provides many endpoint implementations.

As the next step, rebuild the inter-microservice messaging communication with the Cloud Streams. As shown in the next diagram, we will define a SearchSink connected to InventoryQ under the Search microservice. Booking will define a BookingSource for sending inventory change messages connected to InventoryQ. Similarly, Check-in defines a CheckinSource for sending the check-in messages. Booking defines a sink, BookingSink, for receiving messages, both bound to the CheckinQ queue on the RabbitMQ:

Streams for reactive microservices

In this example, we will use RabbitMQ as the message broker:

  1. Add the following Maven dependency to Booking, Search, and Check-in, as these are the three modules using messaging:
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  2. Add the following two properties to booking-service.properties. These properties bind the logical queue inventoryQ to physical inventoryQ, and the logical checkinQ to the physical checkinQ:
    spring.cloud.stream.bindings.inventoryQ.destination=inventoryQ
    spring.cloud.stream.bindings.checkInQ.destination=checkInQ
  3. Add the following property to search-service.properties. This property binds the logical queue inventoryQ to the physical inventoryQ:
    spring.cloud.stream.bindings.inventoryQ.destination=inventoryQ
  4. Add the following property to checkin-service.properties. This property binds the logical queue checkinQ to the physical checkinQ:
    spring.cloud.stream.bindings.checkInQ.destination=checkInQ
  5. Commit all files to the Git repository.
  6. The next step is to edit the code. The Search microservice consumes a message from the Booking microservice. In this case, Booking is the source and Search is the sink.

    Add @EnableBinding to the Sender class of the Booking service. This enables the Cloud Stream to work on autoconfigurations based on the message broker library available in the class path. In our case, it is RabbitMQ. The parameter BookingSource defines the logical channels to be used for this configuration:

    @EnableBinding(BookingSource.class)
    public class Sender {
  7. In this case, BookingSource defines a message channel called inventoryQ, which is physically bound to RabbitMQ's inventoryQ, as configured in the configuration. BookingSource uses an annotation, @Output, to indicate that this is of the output type—a message that is outgoing from a module. This information will be used for autoconfiguration of the message channel:
    interface BookingSource {
        public static String InventoryQ="inventoryQ"; 
        @Output("inventoryQ")
        public MessageChannel inventoryQ();      
    }
  8. Instead of defining a custom class, we can also use the default Source class that comes with Spring Cloud Stream if the service has only one source and sink:
    public interface Source {
      @Output("output")
      MessageChannel output();
    }
  9. Define a message channel in the sender, based on BookingSource. The following code will inject an output message channel with the name inventory, which is already configured in BookingSource:
      @Output (BookingSource.InventoryQ)
      @Autowired
      private MessageChannel;
  10. Reimplement the send message method in BookingSender:
    public void send(Object message){
      messageChannel.
        send(MessageBuilder.withPayload(message).
        build());
    }
  11. Now add the following to the SearchReceiver class the same way we did for the Booking service:
    @EnableBinding(SearchSink.class)
    public class Receiver {
  12. In this case, the SearchSink interface will look like the following. This will define the logical sink queue it is connected with. The message channel in this case is defined as @Input to indicate that this message channel is to accept messages:
    interface SearchSink {
        public static String INVENTORYQ="inventoryQ"; 
        @Input("inventoryQ")
        public MessageChannel inventoryQ();
    }
  13. Amend the Search service to accept this message:
    @ServiceActivator(inputChannel = SearchSink.INVENTORYQ)
    public void accept(Map<String,Object> fare){
            searchComponent.updateInventory((String)fare.
            get("FLIGHT_NUMBER"),(String)fare.
            get("FLIGHT_DATE"),(int)fare.
            get("NEW_INVENTORY"));
    }
  14. We will still need the RabbitMQ configurations that we have in our configuration files to connect to the message broker:
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    server.port=8090
  15. Run all services, and run the website project. If everything is fine, the website project successfully executes the Search, Booking, and Check-in functions. The same can also be tested using the browser by pointing to http://localhost:8001.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.59.205.183