How to do it...

This recipe highlights the main APIs involved in implementing the Reactive programming specification:

  1. Create a Spring Maven project ch07. This project will consist of only native services to highlight Reactive Streams 1.x interfaces and Reactor 3.x implementation classes.
  2. Copy DispatcherServlet, SpringContextConfig, SpringDbConfig, SpringDispatcherConfig, and SpringWebinitializer from the previous chapter and update the details of these application contexts. Use a new core page named org.packt.Reactive.codes. Also reuse the Employee model and DAO classes used in the previous recipes.
  3. Configure the pom.xml file to include all the libraries used in the previous chapter such as the Junit 4, Log4J, Spring 5.0.0.BUILD-SNAPSHOT, Servlet 3.x, JSP 2.3, and MySQL JDBC connector.
  4. For Reactive Stream 1.0, add the following Maven dependencies in pom.xml:
<dependency> 
  <groupId>org.ReactiveStreams</groupId> 
  <artifactId>Reactive-Streams</artifactId> 
  <version>1.0.0</version> 
</dependency> 
<dependency> 
  <groupId>org.ReactiveStreams</groupId> 
  <artifactId>Reactive-Streams-tck</artifactId> 
  <version>1.0.0</version> 
  <scope>test</scope> 
</dependency>
  1. For the Reactive implementation classes, add the following dependencies of Reactor Project 3.0:
<dependency> 
  <groupId>io.projectreactor</groupId> 
  <artifactId>reactor-core</artifactId> 
  <version>3.0.7.RELEASE</version> 
</dependency> 
  1. Inside the package org.packt.Reactive.core.service, create a service interface EmployeeStreamservice that consists of the following abstract methods:
public interface EmployeeStreamservice { 
  public Mono<Void> showThreads(); 
  public Publisher<Employee> readEmployees(); 
  public Publisher<Employee> readEmployee(Integer empId); 
  public Publisher<String> getValidEmployees(); 
} 
  1. To start this chapter, implement Publisher<Void> that will prove that all Streams are threaded. Create an implementation class named EmployeeStreamserviceImpl that will implement the abstract method showThreads () using org.ReactiveStreams.Publisher as shown in the following code snippet:
@Service 
public class EmployeeStreamserviceImpl  
implements EmployeeStreamservice { 
 
  @Autowired 
  private EmployeeDao employeeDaoImpl; 
 
  @Override 
  public Publisher<Void> showThreads() { 
    Runnable task = () ->{ 
      System.out.println(Thread.currentThread().getName()); 
    }; 
    Mono<Void> execThread = Mono.fromRunnable(task); 
    return execThread; 
  } 
}
  1. To avoid synchronous ways of retrieving a list of the Employee model records from the database, establish a data retrieval mechanism using the observer design pattern where Publisher can wrap raw data, convert it into a Stream of objects that can be managed using its set of events asynchronously and transfer the Streams to its different subscribers. Add the following method to our implementation class:
@Override 
public Publisher<Employee> readEmployees() { 
  Publisher<Employee> publishedEmployees=  
  Flux.fromIterable(employeeDaoImpl.getEmployees()); 
  return publishedEmployees; 
} 
  1. Publishers cannot be executed without having any subscribers listening to them. A Subscriber is a recipient and can be a simple service, @Controller, or a module that needs the published Streams. At this point, create a test class TestEmployeeStreamservice that will implement org.ReactiveStreams.Subscriber to run showThreads() and eadEmployees() as follows:
@RunWith(SpringJUnit4ClassRunner.class) 
@WebAppConfiguration 
@ContextConfiguration(classes = { SpringDbConfig.class,  
SpringDispatcherConfig.class }) 
public class TestEmployeeStreamservice { 
 
  @Autowired 
  private EmployeeStreamservice employeeStreamserviceImpl; 
 
  @Test 
  public void testStreamThread(){ 
    Subscriber<Void> mySubscription = newSubscriber<Void>() { 
 
      @Override 
      public void onComplete() { 
        System.out.println("---End of Stream --"); 
      } 
 
      @Override 
      public void onError(Throwable e) { 
        System.out.println("--Transmission Error --"); 
      } 
       
      @Override 
      public void onSubscribe(Subscription subs) {} 
 
      @Override 
      public void onNext(Void none) {  } 
 
    };
employeeStreamserviceImpl.showThreads().subscribe(mySubscription); } @Test public void testReadEmployees(){ Subscriber<Employee> mySubscription = new Subscriber<Employee>() { @Override public void onComplete() { System.out.println("*-------End of Stream -------"); } @Override public void onError(Throwable e) { System.out.println("*----Transmission Error ----"); } @Override public void onNext(Employee emp) { System.out.format("%d %s %s %d ",emp.getId(),
emp.getFirstName(), emp.getLastName(), emp.getAge()); } @Override public void onSubscribe(Subscription subs) { subs.request(Long.MAX_VALUE); } };
employeeStreamserviceImpl.readEmployees().
subscribe(mySubscription); } }
  1. Retrieving a single Object from a service method can also be handled by a publisher but it will require one service transaction to be state-ready for multithreading or concurrency. A java.util.concurrent.Callable package is required to wrap and convert these transactions into thread-ready functions that will return an individual Stream object. The following code snippet is an implementation that retrieves an employee record in asynchronous mode:
@Override 
public Publisher<Employee> readEmployee(Integer empId) { 
  Callable<Employee> task = () -> employeeDaoImpl.getEmployee(empId); 
  Publisher<Employee> publishedEmployee = Mono.fromCallable(task); 
  return publishedEmployee; 
} 
  1. Add the following test case to our TestEmployeeStreamservice that will create a subscriber to readEmployee(Integer empId):
@Test 
public void testReadSingleEmployee(){ 
  Subscriber<Employee> mySubscription = new Subscriber<Employee>() { 
 
    @Override 
    public void onComplete() { 
      System.out.println("-----End of Stream -----"); 
    } 
 
    @Override 
    public void onError(Throwable e) { 
      System.out.println("--Transmission Error ----"); 
    } 
 
    @Override 
    public void onNext(Employee emp) { 
      System.out.format("%d %s %s %d 
", emp.getId(),  
      emp.getFirstName(), emp.getLastName(), emp.getAge()); 
    } 
 
    @Override 
      public void onSubscribe(Subscription subs) { 
      subs.request(Long.MAX_VALUE); 
    } 
  };
employeeStreamserviceImpl.readEmployee(14).
subscribe(mySubscription); }
  1. A Stream of objects returned by publishers can consist of different events working asynchronously to process live data in order to extract the desired final output stream. If all the events did not encounter java.lang.Error or java.lang.Exception then the publisher will emit the desired value, exit, and signal a completion. Otherwise, it will emit an error value or message and will gracefully end the transmission. Following is an implementation of getValidEmployees() that computes a gradient from the employee's age and returns a list of employee names in Strings:
@Override 
public Publisher<String> getValidEmployees(){ 
  Function<Employee, String> validEmps = (e) -> { 
    double ageGradient = (int) ( 1 / (e.getAge() - 22)); 
    if (ageGradient == 0){ 
      return e.getFirstName() + " " + e.getLastName(); 
    }else{ 
      return null; 
    } 
  }; 
       
  Runnable completion = () ->{ 
    System.out.println("**** End of List*****"); 
  }; 
 
  Publisher<String> publishedEmployees= Flux          
.fromIterable(employeeDaoImpl.getEmployees()).map(validEmps) .onErrorReturn("Invalid Employee").doOnComplete(completion); return publishedEmployees; }
The preceding publisher has a method map() which processes all elements sequentially from fromIterable() to generate another stream. From the snippet, its main task is to execute the functional interface Function that concatenates the employee's first name and last names given that the age's gradient is equal to 0, otherwise the function will return Null. The Null Stream data triggers an error in publishers.
  1. Add the following test case that will show us the result once risky getValidEmployees() is executed. Be sure to have an employee record with age 22 to trigger an error:
@Test 
public void testGetValidEmployees(){ 
  Subscriber<String> mySubscription = new Subscriber<String>() { 
 
    @Override 
    public void onComplete() { } 
 
    @Override 
    public void onError(Throwable e) {  } 
 
    @Override 
    public void onNext(String name) { 
      System.out.format("Employee: %s 
", name); 
    }  
 
    @Override 
    public void onSubscribe(Subscription subs) { 
      subs.request(Long.MAX_VALUE); 
    } 
  }; 
       
  employeeStreamserviceImpl.getValidEmployees()
.subscribe(mySubscription); }
When Publisher triggers onErrorReturn() and doOnComplete(), Subscriber does not need to write any snippets in its onError() and onComplete() overrides.
  1. Save all files. Execute all the test cases in TestEmployeeStreamservice and observe how the Publisher-Subscriber model works in the Reactive Streams specification.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.58.116.51