After the generic API classes, let us deal with the specific APIs of Spring Reactor 3.0 by using the following steps:
- Create a service interface EmployeeNativeStreamservice that contains some non-DAO related services which will exhibit how to use Stream types. The following are the abstract methods to be implemented:
public interface EmployeeNativeStreamservice { public Mono<String> processFormUser(String name); public Flux<String> getFormUsers(String... names); public Flux<Integer> getAllAge(Integer[] age); }
- The first method to implement is processFormUser() that will accept a username and executes asynchronous operators in order to format the original String value. Trivial as it may be, this method will tell us how to process a single object through Mono<T> using several operations running synchronously on the main thread. This method will start showcasing some of the core Publisher<T> operations such as doOnNext(), doOnSuccess(), and doOnError().The doOnext() operation is a very essential method since it is triggered when this publisher emits data. If a Throwable object is emitted in any of the operators, an additional threaded operator onErrorReturn() is ready to be invoked to perform a safe exit. Create an implementation EmployeeNativeStreamserviceImpl class and drop the following lines of code as follows:
@Service public class EmployeeNativeStreamserviceImpl implements EmployeeNativeStreamservice { @Override public Mono<String> processFormUser(String name) { Function<String,String> upper = (str) -> str.toUpperCase(); Predicate<String> longName = (str) -> str.length() > 5; Consumer<String> success = (str) ->
System.out.println("successfully processed: " + str); Consumer<Throwable> error = (e) -> System.out.println("encountered an error: : " + e.getMessage()); Consumer<String> onNext = (s) -> System.out.println("approved: " + s); Mono<String> makeoverName = Mono.just(name) .filter(longName) .map(upper) .doOnSuccess(success) .doOnError(error) .doOnNext(onNext) .onErrorReturn("invalid Name"); return makeoverName; } }
- Secondly, add the method getFormUsers() in the following code snippet that accepts an array of user names through a variable argument list, concatenates a verification tag ---VALID USER and then converts all usernames to uppercase. For error handling, another option is to invoke defaultIfEmpty() to emit a default value if the resulting Stream is empty. Also at this point, Flux<T> has its own distinct methods doOnComplete() and doOnTerminate().These asynchronous operators are executed by this Flux<T> Stream type to convert arrays or lists of elements into Streams:
@Override public Flux<String> getFormUsers(String... names) { Function<String,String> upper = (str) -> str.concat("---VALID USER"); Comparator<String> ascSort = (str1, str2) -> str1.compareTo(str2); Runnable complete = () -> { System.out.println("completed processing"); }; Runnable terminate = () -> { System.out.println("terminated with problems"); }; Consumer<String> onNext = (s) -> System.out.println("validated: " + s); Flux<String> userNames = Flux.just(names) .map(upper) .sort(ascSort) .defaultIfEmpty("empty list") .doOnNext(onNext) .doOnComplete(complete) .doOnTerminate(terminate) .doOnError(Exception.class, (e) -> System.out.println("exits gracefully")); return userNames; }
- Lastly, implement getAllAge() which accepts an array of Integers, adds 10 to each age, and traces all Publisher-Subscriber internal operations happening at the main thread. Also present in this snippet is retryWhen() which uses time delay to implement error recovery:
@Override public Flux<Integer> getAllAge(Integer age[]) { Function<Integer, Integer> addBufferAge = (a) -> a + 10; Flux<Integer> allAges = Flux .just(age) .map(addBufferAge) .retryWhen(opionFlux -> Flux.range(10, 100) .flatMap(i -> Flux.just(i).map(addBufferAge))) .log("Adding 10", java.util.logging.Level.INFO); return allAges; }
- Add the following logback dependencies for the log() operator:
<dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency>
- Save EmployeeNativeStreamserviceImpl. Create a test class to execute all three service methods.