To implement concurrent and parallel Stream emissions, perform the following steps:
- Add the following service class in our org.packt.reactive.code.service package. This class contains method templates that will detail parallelism and concurrency based on Reactive Streams specification:
public interface EmployeeParallelStreamservice { public Flux<String> parallelEmployeeNames(); public Flux<GroupedFlux<Integer, Integer>> parallelGrpAvg(); public Flux<String> repeatExecs(); }
- Create an implementation class EmployeeParallelStreamserviceImpl that will contain a method parallelEmployeeNames() designed to utilize a pool of eight threads in order to get the full names of all employees. All these threads must work on each task in parallel mode. In short, this method implements a parallel Flux:
@Service public class EmployeeParallelStreamserviceImpl implements EmployeeParallelStreamservice{ @Autowired private EmployeeDao employeeDaoImpl; @Override public Flux<String> parallelEmployeeNames() { Function<Employee, String> names = (emp) -> { System.out.println("flatMap thread: " + Thread.currentThread().getName()); return emp.getFirstName().charAt(0) + emp.getLastName(); }; Flux<String> parallelEmpFlux = Flux.fromIterable (employeeDaoImpl.getEmployees()) .parallel(8) .runOn (Schedulers.parallel()) .sequential() .map(names); return parallelEmpFlux; } }
- Another method repeatExecs() implements another design for parallelism wherein the Stream will be repeatedly run m times after each onComplete():
@Override public Flux<String> repeatExecs() { Function<Employee, String> names = (emp) ->{ System.out.println("flatMap thread: " + Thread.currentThread().getName()); return emp.getFirstName().charAt(0) + emp.getLastName(); }; Flux<String> parallelEmpFlux = Flux.fromIterable(employeeDaoImpl.getEmployees()) .repeat(2) .parallel(8) .runOn (Schedulers.parallel()) .sequential() .map(names) .doOnSubscribe(subscription -> { System.out.println(subscription); }); return parallelEmpFlux; }
- The last method in this recipe will show us a solution on how to implement grouped parallel Flux. This implementation is so rare that it is only opted for when there is a need to parallelize the subscriber's callback too:
@Override public Flux<GroupedFlux<Integer, Integer>> parallelGrpAvg() { Function<Employee, Integer> ages = (emp) -> { System.out.println("flatMap thread: " + Thread.currentThread().getName()); return emp.getAge(); }; Flux<GroupedFlux<Integer, Integer>> parallelEmpFlux = Flux.fromIterable(employeeDaoImpl.getEmployees()) .delaySubscription( Duration.of(500L, ChronoUnit.MILLIS)) .parallel(8) .runOn (Schedulers.parallel()) .map(ages) .groups(); return parallelEmpFlux; }