Date emissions in Streams are affected by the backpressure operations used to extract the data. Let us define some backpressure operations by doing the following steps:
- Create a separate service class, EmployeeBatchStreamservice that will showcase some abstract methods that will utilize time-related operators that are needed in setting up backpressure on Stream types:
public interface EmployeeBatchStreamservice { public Mono<Employee> selectOneEmployee(); public Flux<Employee> selectSomeEmpRecords(); public Flux<List<Employee>> getEmployeesByBatch(); public Flux<String> getTimedFirstNames(); public Flux<Employee> selectEmpDelayed(); public Flux<Employee> getDeferredEmployees(); }
- The first event to implement is selectOneEmployee() that emits only one Stream object and converts it to a Mono<Employee> type. The doOnCancel() method is always invoked in time-related operators like this since these types of Stream operators trigger the onCancel() method periodically. Add this method to our EmployeeBatchStreamserviceImpl implementation class:
@Service public class EmployeeBatchStreamserviceImpl implements EmployeeBatchStreamservice { @Autowired private EmployeeDao employeeDaoImpl; @Override public Mono<Employee> selectOneEmployee() { Runnable cancel = () ->{ System.out.println("Stream is cancelled"); }; Mono<Employee> oneRecord = Flux.fromIterable(employeeDaoImpl.getEmployees()) .doOnCancel(cancel) .log() .take(1) .singleOrEmpty(); return oneRecord; } }
The main reason why there is a need to manage the emission of Mono<T> even though it only contains one piece of data is because in some circumstances, it may be empty.
- Next, the method selectSomeEmpRecords()transmits only those Stream objects that are only covered within a specific emission period and skips two of them during the operation:
@Override public Flux<Employee> selectSomeEmpRecords() { Flux<Employee> takeSomeRecs = Flux.fromIterable(employeeDaoImpl.getEmployees()) .log() .skip(2) .take(Duration.ofMillis(4)); return takeSomeRecs; }
- The third method to implement emits groups of Stream objects wherein each group contains n elements:
@Override public Flux<List<Employee>> getEmployeesByBatch() { Flux<List<Employee>> recordsByBatch = Flux.fromIterable(employeeDaoImpl.getEmployees()) .log() .buffer(2); return recordsByBatch; }
The operator buffer(n) returns a Stream of Collection where each Collection contains the group of Stream objects.
- One of the most essential operators used in executing expensive or unstable sources of data is the defer() operator. It provides asynchronous execution to Publisher<T> that needs to be run in the background as an independent thread while proceeding with the next operators. This asynchronous method prevents other operators from interfering with the progress or status of the running Publisher<T>. The following method uses the defer() method to wrap an assumed heavy and unstable DAO transaction while filter() waits for its Predicate<T> to process the controlled data stream:
@Override public Flux<Employee> getDeferredEmployees() { Predicate<Employee> validAge = (e) -> e.getAge() > 25; Supplier<Flux<Employee>> deferredTask = ()->Flux.fromIterable( employeeDaoImpl.getEmployees()); Flux<Employee> deferred = Flux.defer(deferredTask).log().filter(validAge); return deferred; }
- One way to manage defer() operations is to use timeout() to explicitly provide a period of waiting for the subscriber to fulfil its subscription. Following is a method implemented for the subscriber to wait 300 milliseconds, otherwise it will trigger its onError() method:
@Override public Flux<String> getTimedFirstNames() { Function<Employee, String> firstNames = (e) -> e.getFirstName(); Supplier<Flux<String>> deferredTask = ()->Flux.fromIterable( employeeDaoImpl.getEmployees()) .map(firstNames); Flux<String> timedDefer = Flux.defer(deferredTask) .log() .timeout(Duration.ofMillis(300)); return timedDefer; }
- To execute this method properly, be sure to override the onError() method of the subscriber just in case onCancel() is triggered. Create TestEmployeeBatchStream and add the following test method:
@Test public void testTimedFirstNames(){ employeeBatchStreamserviceImpl.getTimedFirstNames() .subscribe(new Subscriber<String>(){ @Override public void onComplete() { } @Override public void onError(Throwable arg0) { System.out.println("time is out...."); } @Override public void onNext(String data) { System.out.println(data); } @Override public void onSubscribe(Subscription subs) { subs.request(Long.MAX_VALUE); } }); }
- If there is a need to delay the emission of each element for m seconds given a period of 1 hour, the operators take() and delayElements(m) are used. Combinations of these threaded operations can push any number of data Streams to the subscriber. A Null Stream object is also a valid, expected result so error handling must be ready for these kinds of computations:
@Override public Flux<Employee> selectEmpDelayed() { Supplier<Flux<Employee>> deferredTask = ()->Flux.fromIterable( employeeDaoImpl.getEmployees()); Flux<Employee> oneRecord = Flux.defer(deferredTask) .take(Duration.ofHours(1)) .delayElements(Duration.ofSeconds(10)); return oneRecord; }
- The other way of implementing backpressure is to override the onSubscribe() method of Subscriber<T> to request from Publisher<T> the emission rate of the resulting stream. This Stream request is done by the subscriber through the use of request(t) wherein the subscriber will be expecting t Stream objects per emission. Open the test class TestEmployeeBatchStream and add the following test case:
@Test public void testByRequest(){ Subscriber<Employee> subscriber = new Subscriber<Employee>(){ @Override public void onComplete() { } @Override public void onError(Throwable arg0) { } @Override public void onNext(Employee emp) { System.out.println(emp); } @Override public void onSubscribe(Subscription subs) { subs.request(2); } }; employeeBatchStreamserviceImpl.selectSomeEmpRecords() .subscribe(subscriber); }