How to do it...

Date emissions in Streams are affected by the backpressure operations used to extract the data. Let us define some backpressure operations by doing the following steps:

  1. Create a separate service class, EmployeeBatchStreamservice that will showcase some abstract methods that will utilize time-related operators that are needed in setting up backpressure on Stream types:
public interface EmployeeBatchStreamservice {    
   public Mono<Employee> selectOneEmployee(); 
   public Flux<Employee> selectSomeEmpRecords(); 
   public Flux<List<Employee>> getEmployeesByBatch(); 
   public Flux<String>   getTimedFirstNames(); 
   public Flux<Employee> selectEmpDelayed(); 
   public Flux<Employee> getDeferredEmployees(); 
}
  1. The first event to implement is selectOneEmployee() that emits only one Stream object and converts it to a Mono<Employee> type. The doOnCancel() method is always invoked in time-related operators like this since these types of Stream operators trigger the onCancel() method periodically. Add this method to our EmployeeBatchStreamserviceImpl implementation class:
@Service 
public class EmployeeBatchStreamserviceImpl  
implements EmployeeBatchStreamservice { 
   
@Autowired 
private EmployeeDao employeeDaoImpl; 
    
@Override 
public Mono<Employee> selectOneEmployee() { 
      Runnable cancel = () ->{ 
         System.out.println("Stream is cancelled"); 
}; 
Mono<Employee> oneRecord =  
         Flux.fromIterable(employeeDaoImpl.getEmployees()) 
.doOnCancel(cancel) 
         .log() 
.take(1) 
.singleOrEmpty(); 
      return oneRecord; 
} 
} 
The main reason why there is a need to manage the emission of Mono<T> even though it only contains one piece of data is because in some circumstances, it may be empty.
  1. Next, the method selectSomeEmpRecords()transmits only those Stream objects that are only covered within a specific emission period and skips two of them during the operation:
@Override 
public Flux<Employee> selectSomeEmpRecords() { 
Flux<Employee> takeSomeRecs =  
      Flux.fromIterable(employeeDaoImpl.getEmployees()) 
.log() 
.skip(2) 
.take(Duration.ofMillis(4)); 
   return takeSomeRecs; 
} 
  1. The third method to implement emits groups of Stream objects wherein each group contains n elements:
@Override 
public Flux<List<Employee>> getEmployeesByBatch() { 
      Flux<List<Employee>> recordsByBatch =  
         Flux.fromIterable(employeeDaoImpl.getEmployees()) 
.log() 
.buffer(2); 
      return recordsByBatch; 
} 
The operator buffer(n) returns a Stream of Collection where each Collection contains the group of Stream objects.
  1. One of the most essential operators used in executing expensive or unstable sources of data is the defer() operator. It provides asynchronous execution to Publisher<T> that needs to be run in the background as an independent thread while proceeding with the next operators. This asynchronous method prevents other operators from interfering with the progress or status of the running Publisher<T>. The following method uses the defer() method to wrap an assumed heavy and unstable DAO transaction while filter() waits for its Predicate<T> to process the controlled data stream:
@Override 
public Flux<Employee> getDeferredEmployees() { 
      Predicate<Employee> validAge =  
(e) -> e.getAge() > 25; 
      Supplier<Flux<Employee>> deferredTask =  
()->Flux.fromIterable( 
employeeDaoImpl.getEmployees()); 
      Flux<Employee> deferred =  
         Flux.defer(deferredTask).log().filter(validAge); 
      return deferred; 
} 
  1. One way to manage defer() operations is to use timeout() to explicitly provide a period of waiting for the subscriber to fulfil its subscription. Following is a method implemented for the subscriber to wait 300 milliseconds, otherwise it will trigger its onError() method:
@Override 
   public Flux<String> getTimedFirstNames() { 
      Function<Employee, String> firstNames =  
(e) -> e.getFirstName(); 
      Supplier<Flux<String>> deferredTask =  
()->Flux.fromIterable( 
employeeDaoImpl.getEmployees()) 
                .map(firstNames); 
      Flux<String> timedDefer = Flux.defer(deferredTask) 
.log() 
.timeout(Duration.ofMillis(300)); 
      return timedDefer; 
} 
  1. To execute this method properly, be sure to override the onError() method of the subscriber just in case onCancel() is triggered. Create TestEmployeeBatchStream and add the following test method:
@Test 
public void testTimedFirstNames(){ 
      employeeBatchStreamserviceImpl.getTimedFirstNames() 
.subscribe(new Subscriber<String>(){ 
 
         @Override 
         public void onComplete() {    } 
 
         @Override 
         public void onError(Throwable arg0) { 
            System.out.println("time is out...."); 
         } 
 
         @Override 
         public void onNext(String data) { 
            System.out.println(data); 
} 
 
         @Override 
         public void onSubscribe(Subscription subs) { 
            subs.request(Long.MAX_VALUE); } 
          
      }); 
}
  1. If there is a need to delay the emission of each element for m seconds given a period of 1 hour, the operators take() and delayElements(m) are used. Combinations of these threaded operations can push any number of data Streams to the subscriber. A Null Stream object is also a valid, expected result so error handling must be ready for these kinds of computations:
@Override 
public Flux<Employee> selectEmpDelayed() { 
      Supplier<Flux<Employee>> deferredTask =  
()->Flux.fromIterable( 
employeeDaoImpl.getEmployees()); 
      Flux<Employee> oneRecord = Flux.defer(deferredTask) 
.take(Duration.ofHours(1)) 
.delayElements(Duration.ofSeconds(10)); 
      return oneRecord; 
} 
  1. The other way of implementing backpressure is to override the onSubscribe() method of Subscriber<T> to request from Publisher<T> the emission rate of the resulting stream. This Stream request is done by the subscriber through the use of request(t) wherein the subscriber will be expecting t Stream objects per emission. Open the test class TestEmployeeBatchStream and add the following test case:
@Test 
public void testByRequest(){ 
      Subscriber<Employee> subscriber =  
new Subscriber<Employee>(){ 
 
         @Override 
         public void onComplete() { } 
 
         @Override 
         public void onError(Throwable arg0) {  } 
 
         @Override 
         public void onNext(Employee emp) { 
            System.out.println(emp); 
         } 
 
         @Override 
         public void onSubscribe(Subscription subs) { 
            subs.request(2); 
         } 
      }; 
      employeeBatchStreamserviceImpl.selectSomeEmpRecords() 
.subscribe(subscriber); 
}  
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.245.1