How to do it...

This last recipe will show us how Spring 5 can integrate with other Reactive Stream implementation such as RxJava 2.0:

  1. In order for our Spring 5 platform to work perfectly with RxJava, add the following Maven dependencies to the pom.xml configuration:
  1. Create a service class EmployeeRxJavaService that contains the following template methods summarizing all RxJava events:
public interface EmployeeRxJavaService { 
   public Observable<Employee> getEmployeesRx(); 
   public Single<Employee> getEmployeeRx(int empid); 
   public Flowable<String> getFirstNamesRx(); 
   public Flowable<String> getEmpNamesRx(); 
   public Flowable<String> getEmpNamesParallelRx(); 
   public Flowable<String> combinedStreamRx(List<String> 
   public ConnectableObservable<String> freeFlowEmps(); 
  1. The Publisher<T> interface in Reactor Core is equivalently Observable<T> in RxJava. Similarly, Observable<T> sends data to its subscribers implementing the concept of observer design pattern. Following is a service method getEmployeesRx() that converts records of employees to a Stream of data using Observable<T>:
public class EmployeeRxJavaServiceImpl  
implements EmployeeRxJavaService { 
   private EmployeeDao employeeDaoImpl; 
   public Observable<Employee> getEmployeesRx() { 
      Observable<Employee> publishedEmployees =  
      return publishedEmployees; 
  1. To subscribe with Observable<T>, we need to implement its own io.reactivex.functions.Consumer<T> or override the callback of Observer<T>. RxJava has its own set of functional interfaces and it does support the Java 1.8 Stream APIs unlike Reactor Core. Following is a test class that contains two ways to subscribe with observables:
@ContextConfiguration(classes = { SpringDbConfig 
      .class, SpringDispatcherConfig.class }) 
public class TestEmployeeRxJavaService { 
   private EmployeeRxJavaService employeeRxJavaServiceImpl; 
   public void testEmployeeData(){ 
      Observer<Employee> mySubscription =  
new Observer<Employee>() { 
         public void onComplete() { 
            System.out.println("subscription completed"); 
         public void onError(Throwable ex) { 
            System.out.println("problems encountered"  
+ ex.getMessage()); 
         public void onNext(Employee emp) { 
            System.out.format("Employee: %s 
         public void onSubscribe(Disposable arg0) { 
            System.out.println("subscription started"); 
   public void testEmployeeDataConsumer(){ 
      Consumer<Employee> consume = (emp) ->{ 
  1. To create a single-valued Stream which can contain at most one value, Single<T> is used. It is not a subclass of Observable<T> but it behaves like a Mono<T> publisher that can trigger onSuccess() or onError(). The following is getEmployeeRx() that can emit an Employee record or an error by executing a Callable task:
public Single<Employee> getEmployeeRx(int empid) { 
      Callable<Employee> task =  
() -> employeeDaoImpl.getEmployee(empid); 
      Single<Employee> emp = Single.fromCallable(task); 
      return emp; 
  1. If Reactor Core has Flux<T>, RxJava has Flowable<T> that supports backpressure, transformation, grouping, multithreading, and concurrency operations. The method getFirstNamesRx() converts records of Employees, extracts their first names and converts all String Stream data to uppercase:
public Flowable<String> getFirstNamesRx() { 
      Function<Employee, Publisher<String>> firstNames =  
(emp) -> Mono.just(emp.getFirstName()) 
Flowable<String> emps =  
      return emps; 
  1. To create schedulers, RxJava uses io.reactivex.schedulers.Schedulers to create different io.reactivex.Scheduler for the observables and observers. To assign thread(s) for Observable<T> to process its operations, the method subscribeOn() is invoked. These threads are where the subscriptions happen. On the other hand, Observer<T> callbacks or Lambda expressions can be done in threads assigned by observeOn():
public Flowable<String> getEmpNamesRx() { 
      Scheduler observerWorker = Schedulers.single(); 
      Scheduler subscriberWorker = Schedulers.newThread(); 
      Function<Employee, String> names =  
(emp) -> emp.getFirstName() + emp.getLastName(); 
      Flowable<String> emps = Flowable 
      return emps; 
  1. Parallelism can be designed in most of the RxJava events by creating a thread-pool that will work in parallel mode. The generation of these threads is done by invoking Schedulers.computation() which is shown in the following code snippet:
public Flowable<String> getEmpNamesParallelRx() { 
      Function<Employee, String> names = (emp) ->{ 
         System.out.println("flatMap thread: "  
+ Thread.currentThread().getName()); 
return emp.getFirstName().charAt(0) +  
      Flowable<String> parallelEmpFlux =  
      return parallelEmpFlux; 
  1. Also, the RxJava has its own operations that will provide Stream transformation and manipulations just like sorted() for sorting numeric and character-based Stream elements and zipWith() to create tuples of Stream elements. Create the folllowing method that utilizes the following two operators:
public Flowable<String> combinedStreamRx(List<String> others) { 
      Function<Employee, String> names =  
(emp) -> emp.getFirstName() + "---validated"; 
      Flowable<String> zipNames = Flowable 
          .zipWith(others,(str1, str2) ->  
String.format("%s. %s", str1, str2)); 
      return zipNames; 
  1. Just like Reactor Core, RxJava can also provide operations or events that can generate hot Streams. These Streams are always flowing and the only way for its subscribers to extract the Streams is through the connect() method. It can allow several subscriptions which eventually connect to the hot Stream to start the emission. Create the following method that generates ConnectableObservable<T> from a cold Stream of Employee records:
public ConnectableObservable<String> freeFlowEmps() { 
       List<String> rosterNames = new ArrayList<>(); 
       Function<Employee, String> familyNames =  
(emp) -> emp.getLastName().toUpperCase(); 
       ConnectableObservable<String> flowyNames =  
      return flowyNames; 
  1. Lastly, create a test method that will execute the preceding method:
public void testConnectFluxProcessor(){      employeeRxJavaServiceImpl.freeFlowEmps().connect();    
