In the previous chapters, we have provided details about Mono<T> and Flux<T> streams and how they behave when executed using test methods. This time, let us apply the following streams in our asynchronous and reactive services by following these steps:
- Before we start, add the Maven dependencies of Reactor Stream 1.0 and Reactor Core 3.x in pom.xml. These libraries are also added in the previous chapter.
- Add the following template methods on our existing EmployeeService of the previous recipe:
public interface EmployeeService { // refer to sources public Flux<Employee> readEmployeesFlux(int age); public Flux<Employee> readEmployeesByDescAge(); public Flux<Employee> readEmployeesByAscLastName(); public Flux<String> readEmpFirstNames(); public Mono<Double> getAveAge(); }
- Then, add the following implementation of asynchronous methods in its existing EmployeeServiceImpl using Flux<T> and Mono<T> stream operations that utilize only one thread for both publisher and subscriber operations. Add the following readEmpFirstNames() implementation that uses flatMap() to extract the first names of all employees and sort() them ascendingly using Comparator:
@Service public class EmployeeServiceImpl implements EmployeeService { // refer to sources @Override public Flux<String> readEmpFirstNames() { Function<Employee, Mono<String>> mapProcess = (emp) -> Mono.just(emp).map((e)->{ System.out.println("flux:map task executor: " + Thread.currentThread().getName()); return e.getFirstName().toUpperCase(); }); Comparator<String> strComp = (s1, s2) ->{ System.out.println("flux:sort task executor: " + Thread.currentThread().getName()); return s1.compareTo(s2); }; Flux<String> names = Flux.fromIterable(employeeDaoImpl .getEmployees()).flatMap(mapProcess) .sort(strComp); return names; } }
- Add the following service implementation that uses two separate threads for publisher and subscriber operations:
@Override public Flux<Employee> readEmployeesFlux(int age) { Scheduler subWorker = Schedulers.newSingle("sub-thread"); Scheduler pubWorker = Schedulers.newSingle("pub-thread"); Predicate<Employee> validAge = (e) -> { System.out.println("flux:filter task executor: " + Thread.currentThread().getName()); return e.getAge() > age; }; Supplier<Flux<Employee>> deferredTask = ()->{ System.out.println("flux:defer task executor: " + Thread.currentThread().getName()); return Flux.fromIterable(employeeDaoImpl.getEmployees()); }; Flux<Employee> deferred = Flux.defer(deferredTask) .filter(validAge) .subscribeOn(subWorker) .publishOn(pubWorker); return deferred; } @Override public Flux<Employee> readEmployeesByDescAge() { Scheduler subWorker = Schedulers.newSingle("sub-thread"); Scheduler pubWorker = Schedulers.newSingle("pub-thread"); Supplier<Flux<Employee>> deferredTask = ()->{ System.out.println("flux:defer task executor: "+ Thread.currentThread().getName()); return Flux.fromIterable(employeeDaoImpl.getEmployees()); }; Comparator<Employee> descAge = (e1, e2) -> { System.out.println("flux:sort task executor: " + Thread.currentThread().getName()); if(e1.getAge().compareTo(e2.getAge()) == 0){ return 0; } else if(e1.getAge().compareTo(e2.getAge()) > 0){ return -1; } else return 1; }; Flux<Employee> deferred = Flux.defer(deferredTask) .sort(descAge) .subscribeOn(subWorker) .publishOn(pubWorker); return deferred; }
- Lastly, add the following service methods that returns Mono<T> streams and uses some of the generic functional interfaces such as ToIntFunction<T> in implementing stream operations:
@Override public Mono<Double> getAveAge() { ToIntFunction<Employee> sizeEmpArr = (e) -> { System.out.println("flux:toIntFunction task executor: " + Thread.currentThread().getName()); return e.getAge(); }; Callable<Double> task = () ->{ System.out.println("flux:callable task executor: " +
Thread.currentThread().getName()); return employeeDaoImpl.getEmployees().stream() .mapToInt(sizeEmpArr).average().getAsDouble(); }; Mono<Double> aveAge= Mono.fromCallable(task); return aveAge; }
The use of @Async , Callable<T> and other related APIs are not recommended on these types of services due to some proxy-related issues.
- Save all files. Create a test class to test each service above.