How to do it...

To apply schedulers on our Stream operators, let us perform the following steps:

  1. Create a service class EmployeeScheduledStreamservice that contains the following methods that will make use of the custom dispatcher and thread executor on the main thread:
public interface EmployeeScheduledStreamservice { 
   public Flux<Employee> createPublisherThread(); 
   public Flux<Employee> createSubscriberThread(); 
   public Flux<Employee> createBothThreads(); 
   public Flux<Employee> createPubAndMain(); 
   public Flux<String> createSchedGroupPub(); 
   public Flux<String> createSchedGroupSub(); 
   public Flux<Employee> elasticFlow(); 
   public Flux<String> selectNamesScheduler(); 
} 
  1. The first method to implement is createPublisherThread() which creates a scheduler that triggers a fixed and single-threaded executor-based worker that will host the subscription to Mono<T> or Flux<T> Streams in the background, making the main thread available for other processes. Following is the EmployeeScheduledStreamserviceImpl class which generates a separate Scheduler for a publisher:
@Service 
public class EmployeeScheduledStreamserviceImpl  
implements EmployeeScheduledStreamservice { 
    
   @Autowired 
private EmployeeDao employeeDaoImpl; 
 
@Override 
   public Flux<Employee> createPublisherThread() { 
      Scheduler pubWorker =  
Schedulers.newSingle("pub-thread"); 
      Predicate<Employee> validAge =  
(e) -> { 
         System.out.println("filter thread: "  
            + Thread.currentThread().getName()); 
         return e.getAge() > 25; 
      }; 
      Supplier<Flux<Employee>> deferredTask =  
()->{ 
            System.out.println("defer thread: "  
+ Thread.currentThread().getName()); 
            return Flux.fromIterable(employeeDaoImpl 
.getEmployees()); 
         }; 
      Flux<Employee> deferred =  
         Flux.defer(deferredTask).filter(validAge) 
.publishOn(pubWorker); 
      return deferred; 
} 
} 
  1. Implement a method that uses a separate, single-threaded executor-based worker for the subscriber's callback execution and loads all the publisher threaded operations to the main thread. The following is another addition to the service class:
@Override 
public Flux<Employee> createSubscriberThread() { 
      Scheduler subWorker =  
Schedulers.newSingle("sub-thread"); 
      Predicate<Employee> validAge = (e) -> { 
         System.out.println("filter thread: "  
+ Thread.currentThread().getName()); 
         return e.getAge() > 25; 
      }; 
      Supplier<Flux<Employee>> deferredTask = ()->{ 
         System.out.println("defer thread: "  
+ Thread.currentThread().getName()); 
    return Flux.fromIterable(employeeDaoImpl 
.getEmployees()); 
       }; 
      Flux<Employee> deferred =  
         Flux.defer(deferredTask).filter(validAge) 
.subscribeOn(subWorker); 
      return deferred; 
}
  1. Next, implement createBothThreads() that uses separate threads for the publisher's subscription and scheduler's Consumer or callback executions:
@Override 
public Flux<Employee> createBothThreads() { 
      Scheduler subWorker =  
Schedulers.newSingle("sub-thread"); 
      Scheduler pubWorker =  
Schedulers.newSingle("pub-thread"); 
      Predicate<Employee> validAge = (e) -> { 
         System.out.println("filter thread: "  
+ Thread.currentThread().getName()); 
         return e.getAge() > 25; 
      }; 
      Supplier<Flux<Employee>> deferredTask = ()->{ 
         System.out.println("defer thread: "  
+ Thread.currentThread().getName()); 
         return Flux.fromIterable(employeeDaoImpl 
.getEmployees()); 
      }; 
      Flux<Employee> deferred =  
         Flux.defer(deferredTask).filter(validAge) 
.subscribeOn(subWorker) 
.publishOn(pubWorker); 
      return deferred; 
} 
  1. There are some cases when we isolate some non-critical publisher subscription operations from risky, expensive, or deferred transactions by loading these critical operations into the background thread. In this case, two threads will be utilized by the publishers, the main one and the one generated by a scheduler. Following is a method createPubAndMain() that shows another option for using Schedulers:
@Override 
public Flux<Employee> createPubAndMain() { 
      Scheduler pubWorker =  
Schedulers.newSingle("pub-thread"); 
      Predicate<Employee> validAge = (e) -> { 
         System.out.println("filter thread: "  
+ Thread.currentThread().getName()); 
         return e.getAge() > 25; 
      }; 
      Supplier<Flux<Employee>> deferredTask = ()->{ 
         System.out.println("defer thread: "  
+ Thread.currentThread().getName()); 
         return Flux.fromIterable(employeeDaoImpl 
.getEmployees()); 
      }; 
      Flux<Employee> deferred = Flux.defer(deferredTask) 
.publishOn(pubWorker) 
.filter(validAge); 
      return deferred; 
} 
  1. If a pool of schedulers is required to be assigned on each Stream operator, then this recipe fits to that requirement. In this model, each task will be assigned randomly to any available worker for flatMap() processing. The following is the implementation of the createSchedGroupPub() method that generates 8 workers wherein one worker will be chosen to execute the publisher operations. Moreover, the recipe creates a separate scheduler for the subscriber:
@Override 
public Flux<String> createSchedGroupPub() { 
  Scheduler subWorker = Schedulers.newSingle("sub-thread"); 
  Scheduler parallelGrp = Schedulers.newParallel("pub-grp", 8); 
  Function<Employee, String> allCapsNames = (emp) -> 
emp.getFirstName().toUpperCase() + " " +
emp.getLastName().toUpperCase(); Flux<String> grpFlux = Flux.fromIterable(employeeDaoImpl
.getEmployees()).publishOn(parallelGrp).flatMap((emp)->{ System.out.println("flatMap thread: " +
Thread.currentThread().getName()); return Mono.just(emp).map(allCapsNames).subscribeOn(subWorker); }); return grpFlux; }
There is no concurrency concept yet in this recipe. After executing this method, only one worker will be tasked to do every subscriber event. This recipe is just about creating groups of schedulers and how the platform chooses the available thread to do the publisher's tasks.
  1. If we can allot a group of schedulers to subscriptions, we can also do it with the subscriber's callback. The following is a method that generates a group of schedulers that will execute the Consumer<T> transaction of Subscriber<T>:
@Override 
public Flux<String> createSchedGroupSub() { 
      Scheduler pubWorker =  
Schedulers.newSingle("pub-thread"); 
      Scheduler parallelGrp =  
Schedulers.newParallel("sub-grp", 8); 
      Function<Employee, String> allCapsNames =  
(emp) -> emp.getFirstName().toUpperCase() + " "  
+ emp.getLastName().toUpperCase(); 
       
      Flux<String> strFlux =  
         Flux.fromIterable(employeeDaoImpl 
.getEmployees()) 
               .publishOn(pubWorker) 
               .flatMap((str)->{ 
                  System.out.println("flatMap thread: "  
+ Thread.currentThread().getName()); 
                  return Mono.just(str).map(allCapsNames) 
.subscribeOn(parallelGrp); 
               }); 
      return strFlux; 
} 
  1. When the event is unsure as to whether to utilize a single or group of n workers, Schedulers.elastic() is executed to dynamically create the required pool of workers that are cacheable and reusable after every Stream operation:
@Override 
public Flux<Employee> elasticFlow() { 
       
      Scheduler elastic =  
Schedulers.newElastic("elastic-worker"); 
      Predicate<Employee> validAge = (e) -> { 
         System.out.println("filter thread: "  
+ Thread.currentThread().getName()); 
         return e.getAge() > 25; 
      }; 
      Supplier<Flux<Employee>> deferredTask = ()->{ 
         System.out.println("defer thread: "  
+ Thread.currentThread().getName()); 
         return Flux.fromIterable(employeeDaoImpl 
.getEmployees()); 
      }; 
      Flux<Employee> deferred = Flux.defer(deferredTask) 
.filter(validAge) 
.subscribeOn(elastic); 
      return deferred; 
}
  1. Lastly, some of the publisher operators require Schedulers as parameters just like the window() operation that creates an internal sub-Flux based on a delimiter that splits the whole Flux<T> sequence starting from the first recognized Stream element. The following method shows how window() works with a Scheduler:
@Override 
public Flux<String> selectNamesScheduler() { 
   Scheduler winWorker =  
Schedulers.newSingle("window-thread"); 
   Function<Employee, String> allCapsNames =  
(emp) -> emp.getFirstName().toUpperCase() + " "  
+ emp.getLastName().toUpperCase(); 
   Flux<String> convertWindows =  
      Flux.fromIterable(employeeDaoImpl.getEmployees()) 
            .windowTimeout(2, 
Duration.ofMillis(20), winWorker) 
            .flatMap(str -> str 
                 .map(allCapsNames) 
                 .collectList() 
                 .map(name ->  
                  StringUtils 
.collectionToCommaDelimitedString(name)) 
             ); 
   return convertWindows; 
} 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.57.172