Project Reactor

Reactor is a framework developed for asynchronous programming by Pivotal as an implementation of Reactive Streams. It supports writing of high-performance applications and works asynchronously using the event-driven programming paradigm. The ability to provide backpressure for the asynchronous stream processing is its key feature. The Reactor framework can also be considered as the base library to develop asynchronous application, and it's not a runtime environment. Reactor is based on the design pattern where the services received from the clients will be distributed to different event handlers, where their processing will be done.

The terms used for discussion are very much similar to the ones that we already discussed in RxJava. The intention behind this was very clear--to provide the Reactive Streams with a native library. This library can expose the operators confirmed by the RxJava API. The operators allow the developers to transform, handle, and perform the terminal operations to an event stream. It also has a powerful mechanism to handle errors and exceptions by treating them as events. It has Publisher, Subscriber, Subscription, and Processor as core interfaces.

In Reactor library, Publisher is called Flux. Flux is a term mainly used in chemical reactors. It means an action or the process in which the material that is created after certain chemical reaction, which is flowing, specifically, out of the reactor. The Reactor project used Flux as an event publisher. Flux contains n number of events. Internally, Flux implements the Publisher interface. Flux publishes sequence of events that are of the POJO type. To understand it better, consider the following code:

Flux<String> flux =   
  Flux.just("mango","strawberry","pineapple","papaya"); 

We have just generated Flux on which we can perform operations. The generated Flux can be considered as shown in the following figure:

The preceding figure shows that Flux has emitted four events of type String using the just() method. Flux has the following factory methods for creation:

  • Empty Flux creation, which is as follows:
Flux<String> createEmptyFlux(){ 
  return Flux.empty(); 
} 
  • Flux containing a few elements in it is written as follows:
Flux<String> createFluxWithValues(){ 
  return Flux.just("mango","orange"); 
} 
  • Creating Flux from a List collection as follows:
Flux<String> createFluxFromList(){ 
  List list=new ArrayList(); 
         // code to add objects to the ArrayList 
    return Flux.fromIteratable(list); 
} 
  • An error may occur while processing Flux. The following code shows Flux with an error:
Flux<String> erroredFlux(){ 
  return Flux.error(new Exception()); 
} 

Flux has multiple elements in it. We may need something much more than it. The Spring 5 Framework strongly needs something to handle 0 to 1 streams. Mono has been introduced to facilitate such characteristics. The publisher Mono will have, at the most, one element. Mono<Void> denotes an empty sequence. Consider the following code that creates a Mono publisher with a single event of type String:

Mono<String> createMonoWithValue(){ 
  return Mono.just("mango"); 
} 

The created Mono can be considered as follows:

Mono can be created using factory methods such as empty(), just(), or never(). Let's discuss these methods one by one:

  • Creating empty Mono:
Mono<String> createEmptyMono(){ 
  return Mono.empty(); 
} 
  • Creating Mono with an event:
Mono<String> createMonoWithValue(){ 
  return Mono.just("mango"); 
} 
  • In certain situations, we need to create Mono that will never signal. The following code demonstrates it:
Mono<String> createMonoWithNoSignal(){ 
  return Mono.never(); 
} 
  • We may need Mono emitting an error. The following code shows Mono with an error:
Mono<String> erroredMono(){ 
  return Mono.error(new Exception()); 
} 

We can use the generated Mono and Flux to combine them with each other, and modify their behavior; then we can add a subscriber to subscribe it. Remember, Flux may contain 0 or N events, or stream sequences and Mono 0 or 1 events. Let's discuss how to process streams with the help of certain cases and the methods to perform the task:

  • Converting the Mono to uppercase:
Mono<String> convertToUpperCase() { 
  Mono<String> mono= Mono.just("mango"); 
  return mono.map(String::toUpperCase); 
} 
  • In order to create non-blocking asynchronous streams, we can wrap Mono to Flux as follows:
Flux<String> toUpperCase() { 
  Flux<String> flux = 
Flux.just("mango","orange","pineapple"); return flux.flatMap(fruits ->
Mono.just(fruits.toUpperCase())); }
  • Some of the times, we may get two or more streams; we may need to combine them together with either of the methods shown here:
    • Using mergeWith():
Flux<String> mergerStreams() { Flux<String> flux1= 
Flux.just("mango","orange","pineapple"); Flux<String> flux2= Flux.just("apple"); return flux1.mergeWith(flux2); }
    • Using concatWith():
Flux<String> contactStreams() { 
  Flux<String> flux1= 
Flux.just("mango","orange","pineapple"); Flux<String> flux2= Flux.just("apple"); return flux1.concatWith(flux2); }

To process the publisher, many methods are provided by the APIs, and most of them are operators that facilitate the operations.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.79.59