How to do it...

This last recipe will show us how Spring 5 can integrate with other Reactive Stream implementation such as RxJava 2.0:

  1. In order for our Spring 5 platform to work perfectly with RxJava, add the following Maven dependencies to the pom.xml configuration:
<dependency> 
<groupId>io.Reactivex.rxjava2</groupId> 
<artifactId>rxjava</artifactId> 
<version>2.1.0</version> 
</dependency> 
  1. Create a service class EmployeeRxJavaService that contains the following template methods summarizing all RxJava events:
public interface EmployeeRxJavaService { 
   public Observable<Employee> getEmployeesRx(); 
   public Single<Employee> getEmployeeRx(int empid); 
   public Flowable<String> getFirstNamesRx(); 
   public Flowable<String> getEmpNamesRx(); 
   public Flowable<String> getEmpNamesParallelRx(); 
   public Flowable<String> combinedStreamRx(List<String> 
 others); 
   public ConnectableObservable<String> freeFlowEmps(); 
} 
  1. The Publisher<T> interface in Reactor Core is equivalently Observable<T> in RxJava. Similarly, Observable<T> sends data to its subscribers implementing the concept of observer design pattern. Following is a service method getEmployeesRx() that converts records of employees to a Stream of data using Observable<T>:
@Service 
public class EmployeeRxJavaServiceImpl  
implements EmployeeRxJavaService { 
    
   @Autowired 
   private EmployeeDao employeeDaoImpl; 
 
   @Override 
   public Observable<Employee> getEmployeesRx() { 
      Observable<Employee> publishedEmployees =  
         Observable.fromIterable(employeeDaoImpl.getEmployees()); 
      return publishedEmployees; 
} 
} 
  1. To subscribe with Observable<T>, we need to implement its own io.reactivex.functions.Consumer<T> or override the callback of Observer<T>. RxJava has its own set of functional interfaces and it does support the Java 1.8 Stream APIs unlike Reactor Core. Following is a test class that contains two ways to subscribe with observables:
@RunWith(SpringJUnit4ClassRunner.class) 
@WebAppConfiguration 
@ContextConfiguration(classes = { SpringDbConfig 
      .class, SpringDispatcherConfig.class }) 
public class TestEmployeeRxJavaService { 
    
   @Autowired 
   private EmployeeRxJavaService employeeRxJavaServiceImpl; 
    
   @Test 
   public void testEmployeeData(){ 
      Observer<Employee> mySubscription =  
new Observer<Employee>() { 
 
         @Override 
         public void onComplete() { 
            System.out.println("subscription completed"); 
         } 
 
         @Override 
         public void onError(Throwable ex) { 
            System.out.println("problems encountered"  
+ ex.getMessage()); 
         } 
 
         @Override 
         public void onNext(Employee emp) { 
            System.out.format("Employee: %s 
",  
               emp.getEmpId()); 
         } 
 
         @Override 
         public void onSubscribe(Disposable arg0) { 
            System.out.println("subscription started"); 
         } 
      }; 
      employeeRxJavaServiceImpl.getEmployeesRx() 
.subscribe(mySubscription); 
   } 
    
   @Test 
   public void testEmployeeDataConsumer(){ 
      Consumer<Employee> consume = (emp) ->{ 
         System.out.println(emp.getFirstName()); 
      }; 
      employeeRxJavaServiceImpl.getEmployeesRx() 
.subscribe(consume); 
   } 
} 
  1. To create a single-valued Stream which can contain at most one value, Single<T> is used. It is not a subclass of Observable<T> but it behaves like a Mono<T> publisher that can trigger onSuccess() or onError(). The following is getEmployeeRx() that can emit an Employee record or an error by executing a Callable task:
@Override 
public Single<Employee> getEmployeeRx(int empid) { 
      Callable<Employee> task =  
() -> employeeDaoImpl.getEmployee(empid); 
      Single<Employee> emp = Single.fromCallable(task); 
      return emp; 
}
  1. If Reactor Core has Flux<T>, RxJava has Flowable<T> that supports backpressure, transformation, grouping, multithreading, and concurrency operations. The method getFirstNamesRx() converts records of Employees, extracts their first names and converts all String Stream data to uppercase:
@Override 
public Flowable<String> getFirstNamesRx() { 
      Function<Employee, Publisher<String>> firstNames =  
(emp) -> Mono.just(emp.getFirstName()) 
.map(String::toUpperCase); 
Flowable<String> emps =  
      Flowable.fromIterable(employeeDaoImpl.getEmployees()) 
 .flatMap(firstNames); 
      return emps; 
} 
  1. To create schedulers, RxJava uses io.reactivex.schedulers.Schedulers to create different io.reactivex.Scheduler for the observables and observers. To assign thread(s) for Observable<T> to process its operations, the method subscribeOn() is invoked. These threads are where the subscriptions happen. On the other hand, Observer<T> callbacks or Lambda expressions can be done in threads assigned by observeOn():
@Override 
public Flowable<String> getEmpNamesRx() { 
      Scheduler observerWorker = Schedulers.single(); 
      Scheduler subscriberWorker = Schedulers.newThread(); 
      Function<Employee, String> names =  
(emp) -> emp.getFirstName() + emp.getLastName(); 
      Flowable<String> emps = Flowable 
.fromIterable(employeeDaoImpl.getEmployees()) 
            .map(names) 
.observeOn(observerWorker) 
.subscribeOn(subscriberWorker); 
      return emps; 
}
  1. Parallelism can be designed in most of the RxJava events by creating a thread-pool that will work in parallel mode. The generation of these threads is done by invoking Schedulers.computation() which is shown in the following code snippet:
@Override 
public Flowable<String> getEmpNamesParallelRx() { 
      Function<Employee, String> names = (emp) ->{ 
         System.out.println("flatMap thread: "  
+ Thread.currentThread().getName()); 
return emp.getFirstName().charAt(0) +  
   emp.getLastName(); 
      }; 
       
      Flowable<String> parallelEmpFlux =  
Flowable.fromIterable(employeeDaoImpl 
.getEmployees()) 
            .map(names) 
            .subscribeOn(Schedulers.computation()); 
      return parallelEmpFlux; 
} 
  1. Also, the RxJava has its own operations that will provide Stream transformation and manipulations just like sorted() for sorting numeric and character-based Stream elements and zipWith() to create tuples of Stream elements. Create the folllowing method that utilizes the following two operators:
@Override 
public Flowable<String> combinedStreamRx(List<String> others) { 
      Function<Employee, String> names =  
(emp) -> emp.getFirstName() + "---validated"; 
      Flowable<String> zipNames = Flowable 
.fromIterable(employeeDaoImpl.getEmployees()) 
         .map(names) 
          .sorted() 
          .zipWith(others,(str1, str2) ->  
String.format("%s. %s", str1, str2)); 
      return zipNames; 
}
  1. Just like Reactor Core, RxJava can also provide operations or events that can generate hot Streams. These Streams are always flowing and the only way for its subscribers to extract the Streams is through the connect() method. It can allow several subscriptions which eventually connect to the hot Stream to start the emission. Create the following method that generates ConnectableObservable<T> from a cold Stream of Employee records:
@Override 
public ConnectableObservable<String> freeFlowEmps() { 
       List<String> rosterNames = new ArrayList<>(); 
       Function<Employee, String> familyNames =  
(emp) -> emp.getLastName().toUpperCase(); 
       ConnectableObservable<String> flowyNames =  
         Observable 
.fromIterable(employeeDaoImpl.getEmployees()) 
.map(familyNames).cache() 
.publish();  
       flowyNames.subscribe(System.out::println); 
       flowyNames.subscribe(rosterNames::add);  
        
      return flowyNames; 
} 
  1. Lastly, create a test method that will execute the preceding method:
@Test 
public void testConnectFluxProcessor(){      employeeRxJavaServiceImpl.freeFlowEmps().connect();    
} 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.80.101