How to do it...

This will be the first recipe that will implement a continuous stream:

  1. Let us create a service class EmployeeHotStreamservice that contains the following template methods:
public interface EmployeeHotStreamservice { 
   public ConnectableFlux<String> freeFlowEmps(); 
   public void monoProcessorGetEmployee(Integer id); 
   public void fluxProcessorGetEmployee(List<Integer> ids); 
   public void validateNamesTopic(List<String> names); 
   public void validateNamesWorkQueue(List<String> names); 
   public void validateNamesReplay(List<String> names); 
   public void validateNamesUnicast(List<String> names); 
} 
  1. The first method to implement is freeFlowEmps() which uses ConnectableFlux<T> in its implementation of a continuous data Stream flow. Add the following implementation class with the implemented freeFlowEmps() method. This method also introduces cache() that stores current value for later computations, if there are any:
@Service 
public class EmployeeHotStreamserviceImpl  
implements EmployeeHotStreamservice { 
    
   @Autowired 
   private EmployeeDao employeeDaoImpl; 
    
   @Override 
   public ConnectableFlux<String> freeFlowEmps() { 
      List<String> rosterNames = new ArrayList<>(); 
      Function<Employee, String> familyNames =  
(emp) -> emp.getLastName().toUpperCase(); 
      ConnectableFlux<String> flowyNames =  
         Flux.fromIterable(employeeDaoImpl.getEmployees()) 
.log().map(familyNames).cache().publish(); 
      flowyNames.subscribe(System.out::println); 
      flowyNames.subscribe(rosterNames::add); 
return flowyNames; 
} 
} 
  1. To show the connection process, add the following test class that will trigger the data Stream emission to the two subscribers:
@RunWith(SpringJUnit4ClassRunner.class) 
@WebAppConfiguration 
@ContextConfiguration(classes = { SpringDbConfig 
      .class, SpringDispatcherConfig.class }) 
public class TestEmployeeHotStreamservice { 
    
   @Autowired 
   private EmployeeHotStreamservice  
         employeeHotStreamserviceImpl; 
 
   @Test 
   public void testConnectFluxProcessor(){    
      employeeHotStreamserviceImpl.freeFlowEmps().connect();    
   } 
} 
  1. One of the best solutions in generating hot Streams is the use of Processor<T> which stands as a complete event broadcaster that commands the execution of all its Stream operations in at least one subscriber. Following is a method that utilizes a synchronous event broadcaster that is only used to signal all event executions in at least one subscriber synchronously:
@Override 
public void monoProcessorGetEmployee(Integer id) { 
      MonoProcessor<Integer> future =  
         MonoProcessor.create(); 
      Consumer<Integer> checkEmp = (rowId) ->{ 
         if(employeeDaoImpl.getEmployee(rowId) == null){ 
            System.out.println("Employee with id: "  
+ rowId + " does not exists."); 
         }else{ 
            System.out.println("Employee with id: "  
+ rowId + " exists."); 
         } 
      }; 
       
      Mono<Integer> engine = future 
           .doOnNext(checkEmp) 
           .doOnSuccess(emp -> { 
              System.out.println("Employee's age is "  
+ employeeDaoImpl.getEmployee(emp).getAge()); 
              System.out.println("Employee's dept is: "  
+ employeeDaoImpl.getEmployee(emp).getDeptId()); 
            }) 
           .doOnTerminate((sup, ex) ->  
            System.out.println("Transaction terminated  
                     with error: " +ex.getMessage())) 
           .doOnError(ex -> System.out.println("Error: " 
+ ex.getMessage())); 
       
      engine.subscribe(System.out::println); 
      future.onNext(id); 
      int valStream = future.block(); 
      System.out.println("Employee's ID again is: " +  
            valStream); 
} 
  1. Another processor is FluxProcessor<T> which broadcasts the data emission to its subscriber(s) mainly exposing the process of emission:
@Override 
public void fluxProcessorGetEmployee(List<Integer> ids) { 
       Function<Integer,Integer> checkEmp = (id) ->{ 
         if(!(employeeDaoImpl.getEmployee(id) == null)){ 
            return employeeDaoImpl 
                .getEmployee(id).getAge(); 
         }else{ 
            return -1; 
         } 
       }; 
       FluxProcessor<Integer, Integer> cpuFlow =  
         EmitterProcessor.create(); 
       Flux<Integer> fluxp = cpuFlow.map(checkEmp); 
       Flux<Integer> gradientNum = cpuFlow.map((num) ->  
num + 1000); 
       fluxp.subscribe(System.out::println); 
       gradientNum.subscribe(System.out::println); 
        
       for(Integer id: ids){ 
          cpuFlow.onNext(id); 
       } 
       cpuFlow.onComplete(); 
} 
  1. A specialized EmitterProcessor<T> that caches all its Stream elements for future computations and also allows asynchronous event executions to its subscriber(s) is called ReplayProcessor<T>. Because of its built-in cache(), this processor remembers the previous element and emits it to the next subscriber for another event processing while maintaining an asynchronous boundary between subscribers. Following is validateNamesReplay() that implements the basic semantics for ReplayProcessor<T>:
@Override 
public void validateNamesReplay(List<String> names) { 
      ReplayProcessor<String> replayProcessor =  
         ReplayProcessor.create(); 
      Function<String,String> appendLic =  
(name) -> name.concat(".112234"); 
      Function<String,String> appendKey =  
(name) -> name.concat("-AEK2345J"); 
      Function<String,String> upperCase =  
(name) -> name.toUpperCase(); 
      Flux<String> formatter1 =  
replayProcessor.filter((s) ->  
            s.length() > 4).map(appendLic); 
      Flux<String> formatter2 =  
replayProcessor.filter((s) ->  
            s.startsWith("J")).map(appendKey); 
      Flux<String> formatter3 =  
replayProcessor.filter((s) ->  
            s.endsWith("win")).map(upperCase); 
 
      formatter1.subscribe(System.out::println); 
      formatter2.subscribe(System.out::println); 
      formatter3.subscribe(System.out::println); 
 
      for(String name : names){ 
         replayProcessor.onNext(name); 
      } 
      replayProcessor.onComplete(); 
}
  1. An asynchronous version of EmitterProcessor<T> that signals all execution of all its events per data Stream element asynchronously is called TopicProcessor<T>. This processor needs more backpressure solutions to capture correctly the execution of each subscriber because of its high concurrency features:
@Override 
   public void validateNamesTopic(List<String> names) { 
      TopicProcessor<String> topicProcessor =  
            TopicProcessor.create(); 
      Function<String,String> appendLic =  
(name) -> name.concat(".112234"); 
      Function<String,String> appendKey =  
(name) -> name.concat("-AEK2345J"); 
      Function<String,String> upperCase =  
(name) -> name.toUpperCase(); 
      Flux<String> formatter1 =  
topicProcessor.filter((s) ->  
            s.length() > 4).map(appendLic); 
      Flux<String> formatter2 =  
topicProcessor.filter((s) ->  
            s.startsWith("J")).map(appendKey); 
      Flux<String> formatter3 =  
topicProcessor.filter((s) ->  
            s.endsWith("win")).map(upperCase); 
 
      formatter1.subscribe(System.out::println); 
      formatter2.subscribe(System.out::println); 
      formatter3.subscribe(System.out::println); 
 
      for(String name : names){ 
         topicProcessor.onNext(name); 
      } 
      topicProcessor.onComplete();   
} 
  1. The next processor, WorkQueueProcessor<T>, is also an asynchronous signal broadcaster like TopicProcessor<T> that evenly distributes each element to the next available subscriber. The objective is to share the load fairly to all subscribers which is the same idea with the Round Robin process distribution. Some of the elements might appear or not depending on the constraints given in each event:
@Override 
public void validateNamesWorkQueue(List<String> names) { 
      WorkQueueProcessor<String> wqueueProcessor =  
         WorkQueueProcessor.create(); 
      Function<String,String> appendLic =  
(name) -> name.concat(".112234"); 
      Function<String,String> appendKey =  
(name) -> name.concat("-AEK2345J"); 
      Function<String,String> upperCase =  
(name) -> name.toUpperCase(); 
      Flux<String> formatter1 =  
wqueueProcessor.filter((s) ->  
            s.length() > 4).map(appendLic); 
      Flux<String> formatter2 =  
wqueueProcessor.filter((s) ->  
            s.startsWith("J")).map(appendKey); 
      Flux<String> formatter3 =  
wqueueProcessor.filter((s) ->  
            s.endsWith("win")).map(upperCase); 
 
      formatter1.subscribe(System.out::println); 
      formatter2.subscribe(System.out::println); 
      formatter3.subscribe(System.out::println); 
 
      for(String name : names){ 
         wqueueProcessor.onNext(name); 
      } 
      wqueueProcessor.onComplete(); 
} 
  1. Lastly, a special kind of processor that caches data elements but can only distribute them to strictly one subscriber is UnicastProcessor<T> which is used in validateNamesUnicast(). This UnicastProcessor<T> is usually used for events that require asynchronous queue-based fusion of Streams:
@Override 
public void validateNamesUnicast(List<String> names) { 
      UnicastProcessor<String> unicastProcessor =  
         UnicastProcessor.create(); 
      Function<String,String> appendLic =  
(name) -> name.concat(".112234"); 
      Function<String,String> appendKey =  
(name) -> name.concat("-AEK2345J"); 
      Function<String,String> upperCase =  
(name) -> name.toUpperCase(); 
       
      Flux<String> formatter1 =  
unicastProcessor.filter((s) ->  
         s.length() > 4).map(appendLic); 
      // CANNOT RUN ANYMORE THE SUBSCRIBERS BELOW  
      //Flux<String> formatter2 =  
      // unicastProcessor.filter((s) ->  
      //    s.startsWith("J")).map(appendKey); 
      //Flux<String> formatter3 =  
      //    unicastProcessor.filter((s) ->  
      // s.endsWith("win")).map(upperCase); 
 
      formatter1.subscribe(System.out::println); 
      for(String name : names){ 
         unicastProcessor.onNext(name); 
      } 
      unicastProcessor.onComplete(); 
} 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.172.200