This last recipe will show us how Spring 5 can integrate with other Reactive Stream implementation such as RxJava 2.0:
- In order for our Spring 5 platform to work perfectly with RxJava, add the following Maven dependencies to the pom.xml configuration:
<dependency> <groupId>io.Reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.0</version> </dependency>
- Create a service class EmployeeRxJavaService that contains the following template methods summarizing all RxJava events:
public interface EmployeeRxJavaService { public Observable<Employee> getEmployeesRx(); public Single<Employee> getEmployeeRx(int empid); public Flowable<String> getFirstNamesRx(); public Flowable<String> getEmpNamesRx(); public Flowable<String> getEmpNamesParallelRx(); public Flowable<String> combinedStreamRx(List<String> others); public ConnectableObservable<String> freeFlowEmps(); }
- The Publisher<T> interface in Reactor Core is equivalently Observable<T> in RxJava. Similarly, Observable<T> sends data to its subscribers implementing the concept of observer design pattern. Following is a service method getEmployeesRx() that converts records of employees to a Stream of data using Observable<T>:
@Service public class EmployeeRxJavaServiceImpl implements EmployeeRxJavaService { @Autowired private EmployeeDao employeeDaoImpl; @Override public Observable<Employee> getEmployeesRx() { Observable<Employee> publishedEmployees = Observable.fromIterable(employeeDaoImpl.getEmployees()); return publishedEmployees; } }
- To subscribe with Observable<T>, we need to implement its own io.reactivex.functions.Consumer<T> or override the callback of Observer<T>. RxJava has its own set of functional interfaces and it does support the Java 1.8 Stream APIs unlike Reactor Core. Following is a test class that contains two ways to subscribe with observables:
@RunWith(SpringJUnit4ClassRunner.class) @WebAppConfiguration @ContextConfiguration(classes = { SpringDbConfig .class, SpringDispatcherConfig.class }) public class TestEmployeeRxJavaService { @Autowired private EmployeeRxJavaService employeeRxJavaServiceImpl; @Test public void testEmployeeData(){ Observer<Employee> mySubscription = new Observer<Employee>() { @Override public void onComplete() { System.out.println("subscription completed"); } @Override public void onError(Throwable ex) { System.out.println("problems encountered" + ex.getMessage()); } @Override public void onNext(Employee emp) { System.out.format("Employee: %s ", emp.getEmpId()); } @Override public void onSubscribe(Disposable arg0) { System.out.println("subscription started"); } }; employeeRxJavaServiceImpl.getEmployeesRx() .subscribe(mySubscription); } @Test public void testEmployeeDataConsumer(){ Consumer<Employee> consume = (emp) ->{ System.out.println(emp.getFirstName()); }; employeeRxJavaServiceImpl.getEmployeesRx() .subscribe(consume); } }
- To create a single-valued Stream which can contain at most one value, Single<T> is used. It is not a subclass of Observable<T> but it behaves like a Mono<T> publisher that can trigger onSuccess() or onError(). The following is getEmployeeRx() that can emit an Employee record or an error by executing a Callable task:
@Override public Single<Employee> getEmployeeRx(int empid) { Callable<Employee> task = () -> employeeDaoImpl.getEmployee(empid); Single<Employee> emp = Single.fromCallable(task); return emp; }
- If Reactor Core has Flux<T>, RxJava has Flowable<T> that supports backpressure, transformation, grouping, multithreading, and concurrency operations. The method getFirstNamesRx() converts records of Employees, extracts their first names and converts all String Stream data to uppercase:
@Override public Flowable<String> getFirstNamesRx() { Function<Employee, Publisher<String>> firstNames = (emp) -> Mono.just(emp.getFirstName()) .map(String::toUpperCase); Flowable<String> emps = Flowable.fromIterable(employeeDaoImpl.getEmployees()) .flatMap(firstNames); return emps; }
- To create schedulers, RxJava uses io.reactivex.schedulers.Schedulers to create different io.reactivex.Scheduler for the observables and observers. To assign thread(s) for Observable<T> to process its operations, the method subscribeOn() is invoked. These threads are where the subscriptions happen. On the other hand, Observer<T> callbacks or Lambda expressions can be done in threads assigned by observeOn():
@Override public Flowable<String> getEmpNamesRx() { Scheduler observerWorker = Schedulers.single(); Scheduler subscriberWorker = Schedulers.newThread(); Function<Employee, String> names = (emp) -> emp.getFirstName() + emp.getLastName(); Flowable<String> emps = Flowable .fromIterable(employeeDaoImpl.getEmployees()) .map(names) .observeOn(observerWorker) .subscribeOn(subscriberWorker); return emps; }
- Parallelism can be designed in most of the RxJava events by creating a thread-pool that will work in parallel mode. The generation of these threads is done by invoking Schedulers.computation() which is shown in the following code snippet:
@Override public Flowable<String> getEmpNamesParallelRx() { Function<Employee, String> names = (emp) ->{ System.out.println("flatMap thread: " + Thread.currentThread().getName()); return emp.getFirstName().charAt(0) + emp.getLastName(); }; Flowable<String> parallelEmpFlux = Flowable.fromIterable(employeeDaoImpl .getEmployees()) .map(names) .subscribeOn(Schedulers.computation()); return parallelEmpFlux; }
- Also, the RxJava has its own operations that will provide Stream transformation and manipulations just like sorted() for sorting numeric and character-based Stream elements and zipWith() to create tuples of Stream elements. Create the folllowing method that utilizes the following two operators:
@Override public Flowable<String> combinedStreamRx(List<String> others) { Function<Employee, String> names = (emp) -> emp.getFirstName() + "---validated"; Flowable<String> zipNames = Flowable .fromIterable(employeeDaoImpl.getEmployees()) .map(names) .sorted() .zipWith(others,(str1, str2) -> String.format("%s. %s", str1, str2)); return zipNames; }
- Just like Reactor Core, RxJava can also provide operations or events that can generate hot Streams. These Streams are always flowing and the only way for its subscribers to extract the Streams is through the connect() method. It can allow several subscriptions which eventually connect to the hot Stream to start the emission. Create the following method that generates ConnectableObservable<T> from a cold Stream of Employee records:
@Override public ConnectableObservable<String> freeFlowEmps() { List<String> rosterNames = new ArrayList<>(); Function<Employee, String> familyNames = (emp) -> emp.getLastName().toUpperCase(); ConnectableObservable<String> flowyNames = Observable .fromIterable(employeeDaoImpl.getEmployees()) .map(familyNames).cache() .publish(); flowyNames.subscribe(System.out::println); flowyNames.subscribe(rosterNames::add); return flowyNames; }
- Lastly, create a test method that will execute the preceding method:
@Test public void testConnectFluxProcessor(){ employeeRxJavaServiceImpl.freeFlowEmps().connect(); }