How to do it...

Let's create a reactive stream communication with the Spring Cloud Stream module by performing the following steps:

  1. First, let's create the message producer, which is technically referred to as the SOURCE in the Spring Cloud Stream terminology. Create a Spring Boot 2.0 application with all the core starter POM dependencies, such as webflux, actuator, Thymeleaf, and FreeMarker. Since this project will be for Spring Cloud libraries and plugins for Spring Boot 2.0, add the following Spring Cloud Finchley dependency configuration into the pom.xml file:
<dependencyManagement> 
    <dependencies> 
        <dependency> 
            <groupId>org.springframework.cloud</groupId> 
            <artifactId>spring-cloud-dependencies 
</artifactId> 
            <version>Finchley.M1</version> 
            <type>pom</type> 
            <scope>import</scope> 
        </dependency> 
    </dependencies> 
</dependencyManagement> 
  1. Add the following starter POM, which is required to implement the reactive Spring Cloud Stream:
<dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-stream-reactive</artifactId> 
 </dependency>
  1. Since we will be using a RabbitMQ broker for the message exchange, verify that you have the server installed on the machine. Also, add the following Spring Cloud Stream module for RabbitMQ transactions:
<dependency> 
  <groupId>org.springframework.cloud</groupId> 
  <artifactId>spring-cloud-starter-stream-rabbit 
  </artifactId> 
</dependency>
  1. Inside the core package, org.packt.process.core, create the bootstrap class for this SOURCE application with an @EnableBinding annotation, together with an interface argument that identifies the channel connection with which it registers to the RabbitMQ exchange broker. For this application, the default Source interface will be used:
@SpringBootApplication 
@EnableBinding(Source.class)  
public class SourceMsgBootApplication  
extends  SpringBootServletInitializer  { 
    
   // refer to sources 
} 
  1. Inside its srcmainproperties directory, create application.properties that contain the same Tomcat server-related and connectivity details for the hrs database. Moreover, add the following new details that pertain to the source message channel and the type of source message that this channel will retrieve during the process. Also include the RabbitMQ server details that will host the messaging:
spring.rabbitmq.host=localhost 
spring.rabbitmq.port=5672 
spring.rabbitmq.username=guest 
spring.rabbitmq.password=guest 
spring.rabbitmq.requested-heartbeat=60 
 
spring.cloud.stream.bindings.output.destination=packt.cloud 
spring.cloud.stream.bindings.output.content-type=application/json 
  1. Create @Controller that will contain two request handlers --one that will send the request with an employee ID for the verification of its profile and one that will send a Department payload object to the consumer for saving. Both triggers and event send the request messages to its consumer:
@Controller 
public class MessageController { 
 
@Autowired 
private Source source; 
 
@RequestMapping(method = RequestMethod.GET,   
value = "/selectDept/{id}") 
@ResponseBody 
public String verifyEmployee(@PathVariable("id")  
String id) { 
Message<String> result =  
MessageBuilder.withPayload(id).build(); 
source.output().send(result); 
return result.getPayload(); 
} 
 
@RequestMapping(method = RequestMethod.GET,   
value = "/addDept/{id}/{deptid}/{name}/") 
@ResponseBody 
public Department addEmployee(@PathVariable("id")  
   Integer id, @PathVariable("deptid") Integer deptid,  
         @PathVariable("name") String name) { 
 
Department dept = new Department(); 
dept.setId(id); 
dept.setDeptid(deptid); 
dept.setName(name); 
Message<Department> result =  
      MessageBuilder.withPayload(dept).build(); 
source.output().send(result); 
return result.getPayload(); 
} 
} 
  1. Save all files.
  2. Since we are done with the producer, let's now create the message consumer or the SINK application. Create a Spring Boot 2.0 application, ch11-ipc-sink, with all the core starter POM dependencies similar to those for the producer application. Add the JDBC and JPA starter POM for database transactions. Above all, add the Spring Cloud Finchley Maven dependency configuration to pom.xml.
  3. Now create a core page for the consumer named org.packt.process.core and drop the following bootstrap class that enables message binding using the SINK definition:
@SpringBootApplication 
@EnableBinding(Sink.class) 
public class SinkMsgBootApplication  
extends  SpringBootServletInitializer  { 
    
   // refer to sources 
}
  1. In its srcmain esources, add application.properties that contain the RabbitMQ server details, the input exchange channel, and the type of message to be consumed. Also disable spring.jpa.hibernate.use-new-id-generator-mappings to follow the MySQL auto-increment procedure for primary key generation:
spring.jpa.hibernate.use-new-id-generator-mappings=false 
spring.cloud.stream.bindings.input.destination=packt.cloud 
spring.cloud.stream.bindings.input.content-type=application/json 
 
spring.rabbitmq.host=localhost 
spring.rabbitmq.port=5672 
spring.rabbitmq.username=guest 
spring.rabbitmq.password=guest 
spring.rabbitmq.requested-heartbeat=60 
// see the sources 
The consumer and producer exchange channel must be one and the same, which is packt.cloud, in order for the exchange to happen.
  1. Copy the JPA repository, service, and entity models from the ch10-empservice application. Drop them into this project, and apply the needed package refactoring and some syntax changes.
  2. Create an event handler class that contains events that will be executed every time the request reaches the input queue. For Spring Cloud to detect this bean class, it must have the @Component annotation:
@Component 
public class VerifyEmployeeService { 
    
   @Autowired 
   private DepartmentService departmentServiceImpl; 
       
   private static final Logger log =  
         LoggerFactory.getLogger( 
VerifyEmployeeService.class); 
    
   @ServiceActivator(inputChannel=Sink.INPUT) 
   public void validateEmployee(Integer deptId) { 
      Department dept = null; 
      try{ 
         dept = departmentServiceImpl.findDeptByid(deptId); 
      }catch(Exception e){ 
         dept = new Department(); 
         dept.setName("Non-existent"); 
      }  
       log.info("{}", dept.getName()); 
    } 
     
    @ServiceActivator(inputChannel=Sink.INPUT) 
    public void addDepartment(Department dept) { 
       
      try{ 
         departmentServiceImpl.saveDeptRec(dept); 
      }catch(Exception e){ 
          log.info("{}", e.getMessage()); 
      } 
        log.info("{}", dept.getName()); 
    } 
} 
Spring Cloud Stream has a built-in object converter and mapper, which is the only reason why SINK can easily access employee ID as an int object from the input channel. Object payloads are automatically converted into JSON without adding Jackson mapper plugins.
  1. Save all files. Assuming that the RabbitMQ server is running, deploy the SOURCE and SINK projects. Run both http://localhost:9004/ch11-ipc-source/addDept/777/3433/Psychology/ and http://localhost:9004/ch11-ipc-source/selectDept/395, and check the logs. Also, check the department table if the Psychology record has been inserted.
  2. An optional component of Spring Cloud Stream, which is called a PROCESSOR can be added to this recipe to act like a filter to the incoming stream that matches an event. These events can forward reactive stream data to SINK. Let's create a new Spring Boot 2.0 application that will pose as the PROCESSOR for the preceding message exchange. Add the same core starter POM and Spring Cloud Finchley dependencies to pom.xml.
  1. Inside its core package org.packt.process.core, create a bootstrap class that enables message binding using the default class definition of the Processor interface:
@EnableBinding(Processor.class) 
@SpringBootApplication 
public class ProcessorMsgBootApplication  
extends  SpringBootServletInitializer  { 
    
   // refer to sources 
}
  1. Inside the org.packt.process.core.config package, create this listener class that will filter the employee ID payload from the producer message, validate if it's convertible to an int object, and return 0 if the employee ID is not convertible to an integer value:
@Configuration 
public class EmpIdConverterConverter { 
    
   @StreamListener(Processor.INPUT)  
   @SendTo(Processor.OUTPUT) 
   public Integer verifyEmpString(String message) { 
      System.out.println("first"); 
      Integer empid = null; 
      try{ 
         empid = Integer.parseInt(message); 
      } catch(Exception e){ 
          empid = 0;  
      } 
        
      return empid; 
   } 
} 
This implementation processes and forwards a blocking stream.
  1. Save all files. Deploy the SOURCE, SINK, and PROCESSOR applications. Open a browser and run http://localhost:9004/ch11-ipc-source/selectDept/39 again. Send an invalid employee ID and observe the logs.
  2. Unlike in the previous recipe, Spring Cloud Stream is a messaging framework design for long-lived interprocess communication. To create a PROCESSOR that will manage a hot stream or continuous flow of message requests, the reactor-based event handlers wherein outgoing and incoming message requests will all be in Flux<T> stream. Then try once again replacing the previous blocking verifyEmpString() with the following reactive handler:
@StreamListener 
@Output(Processor.OUTPUT) 
public Flux<String> verifyEmpString( 
@Input(Processor.INPUT) Flux<String> id) { 
      System.out.println("first"); 
      id.delayElements(Duration.ofMillis(2)) 
         .log(); 
      return id; 
} 
  1. Save all files. Deploy all three applications. Rerun http://localhost:9004/ch11-ipc-source/selectDept/39, and observe the broker exchange between the input and output queues:
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.97.40