How to do it...

In the previous chapters, we have provided details about Mono<T> and Flux<T> streams and how they behave when executed using test methods. This time, let us apply the following streams in our asynchronous and reactive services by following these steps:

  1. Before we start, add the Maven dependencies of Reactor Stream 1.0 and Reactor Core 3.x in pom.xml. These libraries are also added in the previous chapter.
  2. Add the following template methods on our existing EmployeeService of the previous recipe:
public interface EmployeeService { 
   // refer to sources 
   public Flux<Employee> readEmployeesFlux(int age); 
   public Flux<Employee> readEmployeesByDescAge(); 
   public Flux<Employee> readEmployeesByAscLastName(); 
   public Flux<String> readEmpFirstNames(); 
   public Mono<Double> getAveAge(); 
} 
  1. Then, add the following implementation of asynchronous methods in its existing EmployeeServiceImpl using Flux<T> and Mono<T> stream operations that utilize only one thread for both publisher and subscriber operations. Add the following readEmpFirstNames() implementation that uses flatMap() to extract the first names of all employees and sort() them ascendingly using Comparator:
@Service 
public class EmployeeServiceImpl implements EmployeeService { 
    
// refer to sources   
 
   @Override 
   public Flux<String> readEmpFirstNames() { 
      Function<Employee, Mono<String>> mapProcess =  
(emp) -> Mono.just(emp).map((e)->{ 
            System.out.println("flux:map task executor: " +  
               Thread.currentThread().getName()); 
            return e.getFirstName().toUpperCase(); 
      }); 
      Comparator<String> strComp = (s1, s2) ->{ 
         System.out.println("flux:sort task executor: " +  
            Thread.currentThread().getName()); 
         return s1.compareTo(s2); 
      }; 
   Flux<String> names =  
     Flux.fromIterable(employeeDaoImpl 
.getEmployees()).flatMap(mapProcess) 
  .sort(strComp); 
      return names; 
   } 
} 
  1. Add the following service implementation that uses two separate threads for publisher and subscriber operations:
@Override 
public Flux<Employee> readEmployeesFlux(int age) { 
  Scheduler subWorker = Schedulers.newSingle("sub-thread"); 
  Scheduler pubWorker = Schedulers.newSingle("pub-thread"); 
  Predicate<Employee> validAge = (e) -> { 
    System.out.println("flux:filter task executor: " +  
    Thread.currentThread().getName()); 
    return e.getAge() > age; 
  }; 
  Supplier<Flux<Employee>> deferredTask = ()->{ 
    System.out.println("flux:defer task executor: " +  
    Thread.currentThread().getName()); 
    return Flux.fromIterable(employeeDaoImpl.getEmployees()); 
  }; 
  Flux<Employee> deferred = Flux.defer(deferredTask) 
  .filter(validAge) 
  .subscribeOn(subWorker) 
  .publishOn(pubWorker); 
  return deferred; 
} 
 
@Override 
public Flux<Employee> readEmployeesByDescAge() { 
  Scheduler subWorker = Schedulers.newSingle("sub-thread"); 
  Scheduler pubWorker = Schedulers.newSingle("pub-thread"); 
  Supplier<Flux<Employee>> deferredTask = ()->{ 
    System.out.println("flux:defer task executor: "+  
    Thread.currentThread().getName()); 
    return Flux.fromIterable(employeeDaoImpl.getEmployees()); 
  }; 
  Comparator<Employee> descAge = (e1, e2) -> { 
    System.out.println("flux:sort task executor: " +  
    Thread.currentThread().getName()); 
    if(e1.getAge().compareTo(e2.getAge()) == 0){ 
      return 0; 
    } else if(e1.getAge().compareTo(e2.getAge()) > 0){ 
      return -1; 
    } else return 1; 
  }; 
  Flux<Employee> deferred = Flux.defer(deferredTask) 
  .sort(descAge) 
  .subscribeOn(subWorker) 
  .publishOn(pubWorker); 
  return deferred; 
} 
  1. Lastly, add the following service methods that returns Mono<T> streams and uses some of the generic functional interfaces such as ToIntFunction<T> in implementing stream operations:
@Override 
public Mono<Double> getAveAge() { 
  ToIntFunction<Employee> sizeEmpArr = (e) -> { 
    System.out.println("flux:toIntFunction task  
    executor: " + Thread.currentThread().getName()); 
    return e.getAge(); 
  }; 
  Callable<Double> task = () ->{ 
    System.out.println("flux:callable task  executor: " + 
Thread.currentThread().getName()); return employeeDaoImpl.getEmployees().stream() .mapToInt(sizeEmpArr).average().getAsDouble(); }; Mono<Double> aveAge= Mono.fromCallable(task); return aveAge; }
The use of @Async , Callable<T> and other related APIs are not recommended on these types of services due to some proxy-related issues.
  1. Save all files. Create a test class to test each service above.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.98.207