How to do it...

After the generic API classes, let us deal with the specific APIs of Spring Reactor 3.0 by using the following steps:

  1. Create a service interface EmployeeNativeStreamservice that contains some non-DAO related services which will exhibit how to use Stream types. The following are the abstract methods to be implemented:
public interface EmployeeNativeStreamservice { 
  public Mono<String> processFormUser(String name); 
  public Flux<String> getFormUsers(String... names); 
  public Flux<Integer> getAllAge(Integer[] age); 
} 
  1. The first method to implement is processFormUser() that will accept a username and executes asynchronous operators in order to format the original String value. Trivial as it may be, this method will tell us how to process a single object through Mono<T> using several operations running synchronously on the main thread. This method will start showcasing some of the core Publisher<T> operations such as doOnNext(), doOnSuccess(), and doOnError().The doOnext() operation is a very essential method since it is triggered when this publisher emits data. If a Throwable object is emitted in any of the operators, an additional threaded operator onErrorReturn() is ready to be invoked to perform a safe exit. Create an implementation EmployeeNativeStreamserviceImpl class and drop the following lines of code as follows:
@Service 
public class EmployeeNativeStreamserviceImpl  
implements EmployeeNativeStreamservice { 
    
  @Override 
  public Mono<String> processFormUser(String name) { 
    Function<String,String> upper = (str) -> str.toUpperCase(); 
    Predicate<String> longName = (str) -> str.length() > 5; 
    Consumer<String> success = (str) ->  
System.out.println("successfully processed: " + str); Consumer<Throwable> error = (e) -> System.out.println("encountered an error: : " + e.getMessage()); Consumer<String> onNext = (s) -> System.out.println("approved: " + s); Mono<String> makeoverName = Mono.just(name) .filter(longName) .map(upper) .doOnSuccess(success) .doOnError(error) .doOnNext(onNext) .onErrorReturn("invalid Name"); return makeoverName; } }
  1. Secondly, add the method getFormUsers() in the following code snippet that accepts an array of user names through a variable argument list, concatenates a verification tag ---VALID USER and then converts all usernames to uppercase. For error handling, another option is to invoke defaultIfEmpty() to emit a default value if the resulting Stream is empty. Also at this point, Flux<T> has its own distinct methods doOnComplete() and doOnTerminate().These asynchronous operators are executed by this Flux<T> Stream type to convert arrays or lists of elements into Streams:
@Override 
public Flux<String> getFormUsers(String... names) { 
      Function<String,String> upper =  
(str) -> str.concat("---VALID USER"); 
      Comparator<String> ascSort =  
(str1, str2) -> str1.compareTo(str2); 
      Runnable complete = () -> { 
         System.out.println("completed processing"); 
      }; 
      Runnable terminate = () -> { 
         System.out.println("terminated with problems"); 
      }; 
      Consumer<String> onNext =  
(s) -> System.out.println("validated: " + s); 
      Flux<String> userNames = Flux.just(names) 
         .map(upper) 
         .sort(ascSort) 
         .defaultIfEmpty("empty list") 
         .doOnNext(onNext) 
         .doOnComplete(complete) 
         .doOnTerminate(terminate) 
         .doOnError(Exception.class,  
(e) -> System.out.println("exits gracefully")); 
      return userNames; 
}
  1. Lastly, implement getAllAge() which accepts an array of Integers, adds 10 to each age, and traces all Publisher-Subscriber internal operations happening at the main thread. Also present in this snippet is retryWhen() which uses time delay to implement error recovery:
@Override 
public Flux<Integer> getAllAge(Integer age[]) {     
      Function<Integer, Integer> addBufferAge =  
          (a) -> a + 10; 
      Flux<Integer> allAges = Flux 
            .just(age) 
            .map(addBufferAge) 
            .retryWhen(opionFlux -> Flux.range(10, 100) 
                    .flatMap(i ->   
                     Flux.just(i).map(addBufferAge))) 
.log("Adding 10",  
           java.util.logging.Level.INFO);  
      return allAges; 
} 
  1. Add the following logback dependencies for the log() operator:
<dependency>  
    <groupId>ch.qos.logback</groupId>  
    <artifactId>logback-classic</artifactId>  
    <version>1.2.3</version>  
</dependency> 
  1. Save EmployeeNativeStreamserviceImpl. Create a test class to execute all three service methods.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.226.120