How to do it...

To implement concurrent and parallel Stream emissions, perform the following steps:

  1. Add the following service class in our org.packt.reactive.code.service package. This class contains method templates that will detail parallelism and concurrency based on Reactive Streams specification:
public interface EmployeeParallelStreamservice { 
    
   public Flux<String> parallelEmployeeNames(); 
   public Flux<GroupedFlux<Integer, Integer>>  
         parallelGrpAvg(); 
   public Flux<String> repeatExecs(); 
    
} 
  1. Create an implementation class EmployeeParallelStreamserviceImpl that will contain a method parallelEmployeeNames() designed to utilize a pool of eight threads in order to get the full names of all employees. All these threads must work on each task in parallel mode. In short, this method implements a parallel Flux:
@Service 
public class EmployeeParallelStreamserviceImpl  
implements EmployeeParallelStreamservice{ 
    
   @Autowired 
   private EmployeeDao employeeDaoImpl; 
 
   @Override 
   public Flux<String> parallelEmployeeNames() { 
      Function<Employee, String> names = (emp) -> { 
         System.out.println("flatMap thread: "  
+ Thread.currentThread().getName()); 
         return emp.getFirstName().charAt(0) +  
            emp.getLastName(); 
      }; 
       
      Flux<String> parallelEmpFlux =  
         Flux.fromIterable 
(employeeDaoImpl.getEmployees()) 
        .parallel(8)  
        .runOn (Schedulers.parallel()) 
        .sequential() 
        .map(names); 
         
      return parallelEmpFlux; 
} 
} 
  1. Another method repeatExecs() implements another design for parallelism wherein the Stream will be repeatedly run m times after each onComplete():
@Override 
public Flux<String> repeatExecs() { 
      Function<Employee, String> names =  
(emp) ->{ 
         System.out.println("flatMap thread: "  
+ Thread.currentThread().getName()); 
         return emp.getFirstName().charAt(0) +  
               emp.getLastName(); 
      }; 
       
      Flux<String> parallelEmpFlux =  
         Flux.fromIterable(employeeDaoImpl.getEmployees()) 
             .repeat(2) 
             .parallel(8)  
             .runOn (Schedulers.parallel()) 
             .sequential() 
                .map(names) 
             .doOnSubscribe(subscription -> { 
                     System.out.println(subscription); 
               });  
      return parallelEmpFlux; 
} 
  1. The last method in this recipe will show us a solution on how to implement grouped parallel Flux. This implementation is so rare that it is only opted for when there is a need to parallelize the subscriber's callback too:
@Override 
public Flux<GroupedFlux<Integer, Integer>> parallelGrpAvg() { 
      Function<Employee, Integer> ages = (emp) -> { 
         System.out.println("flatMap thread: "  
+ Thread.currentThread().getName()); 
         return emp.getAge(); 
      }; 
       
      Flux<GroupedFlux<Integer, Integer>> parallelEmpFlux =  
         Flux.fromIterable(employeeDaoImpl.getEmployees()) 
           .delaySubscription( 
Duration.of(500L, ChronoUnit.MILLIS)) 
           .parallel(8)  
           .runOn (Schedulers.parallel()) 
           .map(ages) 
           .groups(); 
        return parallelEmpFlux; 
}  
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.235.176