All the preceding recipes highlighted the sequential stream which is common to many stream-based transactions. Let us now generate the parallel stream form by following these steps:
- Create a service class, EmployeeParallelStreamService, inside the same package as the previous service classes. Add the following version of showAllEmployees() that uses parallelStream() to forEach() all employee records:
public void showAllEmployees(){ Consumer<Employee> showAll = (e) -> { System.out.format("%s %s %d ", e.getFirstName(), e.getLastName(), e.getAge()); }; employeeDaoImpl.getEmployees() .parallelStream() .forEach(showAll); }
- Create the test class that will prove that parallelStream() provides multi-processing when executing the pipelined Stream operations:
@RunWith(SpringJUnit4ClassRunner.class) @WebAppConfiguration @ContextConfiguration(classes = { SpringDbConfig.class, SpringDispatcherConfig.class }) public class TestEmployeeParallelStreamService { @Autowired private EmployeeParallelStreamService employeeParallelStreamService; @Test public void testParallelViewRecs(){ System.out.println("*********ITERATION 1********"); employeeParallelStreamService.showAllEmployees(); System.out.println("********ITERATION 2**********"); employeeParallelStreamService.showAllEmployees(); System.out.println("********************************"); } }
Every snapshot of the test method execution will give us a different random order of employee records, as follows:
************EXECUTION 1************* Sherwin Tragura 22 Joel Enage 45 Jerry Mayo 42 Sherwin Tragura 38 ************EXECUTION 2************* Sherwin Tragura 22 Joel Enage 45 Sherwin Tragura 38 Jerry Mayo 42 ************************************
- Add the following service methods to EmployeeParallelStreamService that will show all the threads that parallelStream() utilizes when compared to the sequential version:
public double getSequentialAverageAge(){ ToIntFunction<Employee> sizeEmpArr = (e) -> { System.out.println("Thread: " + Thread.currentThread().getName()); return e.getAge(); }; return employeeDaoImpl.getEmployees() .stream() .mapToInt(sizeEmpArr) .average().getAsDouble(); } public double getParallelAverageAge(){ ToIntFunction<Employee> sizeEmpArr = (e) -> { System.out.println("Thread: " + Thread.currentThread().getName()); return e.getAge(); }; return employeeDaoImpl.getEmployees() .parallelStream() .mapToInt(sizeEmpArr) .average().getAsDouble(); }
- Add the test method in TestEmployeeParallelStreamService that will test and execute the preceding service's methods:
@Test public void compareComputation(){ System.out.println("**********PARALLEL************"); System.out.println("Average: " + employeeParallelStreamService.getParallelAverageAge()); System.out.println("**********SEQUENTIAL**********"); System.out.println("Average: " + employeeParallelStreamService .getSequentialAverageAge()); System.out.println("*********************************"); }
Executing this test will give us the following console log:
************PARALLEL**************** Thread: main Thread: ForkJoinPool.commonPool-worker-3 Thread: ForkJoinPool.commonPool-worker-1 Thread: ForkJoinPool.commonPool-worker-2 Average: 36.75 ************SEQUENTIAL************** Thread: main Thread: main Thread: main Thread: main Average: 36.75 ************************************
- Add another service method that custom generates a thread pool for parallelStream(), to be used in computing the average employee age:
public double getAverageMoreProcessors() throws InterruptedException, ExecutionException{ ToIntFunction<Employee> sizeEmpArr = (e) -> { System.out.println("Thread: " + Thread.currentThread().getName()); return e.getAge(); }; Callable<Double> task = () -> employeeDaoImpl.getEmployees() .stream() .mapToInt(sizeEmpArr) .average().getAsDouble(); ForkJoinPool forkJoinPool = new ForkJoinPool(4); double avgAge = forkJoinPool.submit(task).get(); return avgAge; }