How to do it...

All the preceding recipes highlighted the sequential stream which is common to many stream-based transactions. Let us now generate the parallel stream form by following these steps:

  1. Create a service class, EmployeeParallelStreamService, inside the same package as the previous service classes. Add the following version of showAllEmployees() that uses parallelStream() to forEach() all employee records:
public void showAllEmployees(){ 
   Consumer<Employee> showAll = (e) -> { 
     System.out.format("%s %s %d
", 
        e.getFirstName(), e.getLastName(), e.getAge()); 
    }; 
   employeeDaoImpl.getEmployees() 
      .parallelStream() 
      .forEach(showAll); 
} 
  1. Create the test class that will prove that parallelStream() provides multi-processing when executing the pipelined Stream operations:
@RunWith(SpringJUnit4ClassRunner.class) 
@WebAppConfiguration 
@ContextConfiguration(classes = { SpringDbConfig.class, 
  SpringDispatcherConfig.class }) 
public class TestEmployeeParallelStreamService { 
   
  @Autowired 
  private EmployeeParallelStreamService 
    employeeParallelStreamService; 
   
  @Test 
  public void testParallelViewRecs(){ 
    System.out.println("*********ITERATION 1********"); 
    employeeParallelStreamService.showAllEmployees(); 
    System.out.println("********ITERATION 2**********"); 
    employeeParallelStreamService.showAllEmployees(); 
    System.out.println("********************************"); 
  } 
}

Every snapshot of the test method execution will give us a different random order of employee records, as follows:

************EXECUTION 1************* 
Sherwin Tragura 22 
Joel Enage 45 
Jerry Mayo 42 
Sherwin Tragura 38 
************EXECUTION 2************* 
Sherwin Tragura 22 
Joel Enage 45 
Sherwin Tragura 38 
Jerry Mayo 42 
************************************ 
  1. Add the following service methods to EmployeeParallelStreamService that will show all the threads that parallelStream() utilizes when compared to the sequential version:
public double getSequentialAverageAge(){ 
     ToIntFunction<Employee> sizeEmpArr =  
      (e) -> { 
        System.out.println("Thread: " + 
          Thread.currentThread().getName()); 
        return e.getAge(); 
      }; 
     return employeeDaoImpl.getEmployees() 
       .stream() 
       .mapToInt(sizeEmpArr) 
       .average().getAsDouble(); 
} 
   
public double getParallelAverageAge(){ 
      ToIntFunction<Employee> sizeEmpArr = (e) -> { 
        System.out.println("Thread: " + 
        Thread.currentThread().getName()); 
        return e.getAge(); 
      }; 
    return employeeDaoImpl.getEmployees() 
.parallelStream() 
.mapToInt(sizeEmpArr) 
.average().getAsDouble(); 
}
  1. Add the test method in TestEmployeeParallelStreamService that will test and execute the preceding service's methods:
@Test 
public void compareComputation(){    
                System.out.println("**********PARALLEL************"); 
System.out.println("Average: " +    
 employeeParallelStreamService.getParallelAverageAge()); 
    System.out.println("**********SEQUENTIAL**********"); 
   System.out.println("Average: " + 
  employeeParallelStreamService 
.getSequentialAverageAge()); 
System.out.println("*********************************"); 
} 

Executing this test will give us the following console log:

************PARALLEL**************** 
Thread: main 
Thread: ForkJoinPool.commonPool-worker-3 
Thread: ForkJoinPool.commonPool-worker-1 
Thread: ForkJoinPool.commonPool-worker-2 
Average: 36.75 
************SEQUENTIAL************** 
Thread: main 
Thread: main 
Thread: main 
Thread: main 
Average: 36.75 
************************************ 
  1. Add another service method that custom generates a thread pool for parallelStream(), to be used in computing the average employee age:
public double getAverageMoreProcessors() 
    throws InterruptedException, ExecutionException{ 
    ToIntFunction<Employee> sizeEmpArr = 
     (e) -> { 
        System.out.println("Thread: " + 
          Thread.currentThread().getName()); 
        return e.getAge(); 
       }; 
     Callable<Double> task = 
        () -> employeeDaoImpl.getEmployees() 
                 .stream() 
 .mapToInt(sizeEmpArr) 
 .average().getAsDouble(); 
     ForkJoinPool forkJoinPool = new ForkJoinPool(4);   
     double avgAge = forkJoinPool.submit(task).get(); 
     return avgAge; 
} 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.170.187