Understanding Flux and Mono

As I said, Reactor is another fourth-generation Reactive library like ReactiveX. It originally started as a lightweight version of Rx; however, with time, it grew, and today it's almost of the same weight as ReactiveX.

It also has a producer and consumer module, just like Rx. It has Flux, similar to Flowable and Mono as a combination of Single and Maybe.

Note that when describing Flux, I said Flowable, not Observable. You can probably guess the reason. Yes, all Reactor types are backpressure enabled. Basically, all the Reactor types are a direct implementation of the Reactive Streams Publisher API.

Flux is a Reactor producer that can emit N number of emissions and can terminate successfully or with an error. Similarly, with Mono, it may or may not emit single items. So, what are we waiting for? Let's get started with Flux and Mono.

Consider the following code example:

    fun main(args: Array<String>) { 
      val flux = Flux.just("Item 1","Item 2","Item 3") 
      flux.subscribe(object:Consumer<String>{ 
        override fun accept(item: String) { 
            println("Got Next $item") 
        } 
     }) 
    } 

The output is as follows:

The output, as well as the program, is quite similar to RxKotlin, isn't it? The only difference is that we are using Flux instead of Flowable.

So, let's take a Mono example. Take a look at the following example:

    fun main(args: Array<String>) { 
 
      val consumer = object : Consumer<String> {//(1) 
        override fun accept(item: String) { 
            println("Got $item") 
        } 
      } 
 
 
      val emptyMono = Mono.empty<String>()//(2) 
      emptyMono 
        .log() 
        .subscribe(consumer) 
 
      val emptyMono2 = Mono.justOrEmpty<String>(null)//(3) 
      emptyMono2 
        .log() 
        .subscribe(consumer) 
 
      val monoWithData = Mono.justOrEmpty<String>("A String")//(4) 
      monoWithData 
        .log() 
        .subscribe(consumer) 
 
      val monoByExtension = "Another String".toMono()//(5) 
      monoByExtension 
        .log() 
        .subscribe(consumer) 
     } 

Before we describe the program line by line, let's first focus on the log operator in each of the subscriptions. The Reactor Framework understands a developer's need to log things, that's why they provided an operator so that we can have a log of every event within a Flux or Mono.

On comment (1), in this program, we created a Consumer instance to use in all the Subscriptions. On comment (2), we created an empty Mono with the Mono.empty() factory method. As the name depicts, this factory method creates an empty Mono.

On comment (3), we created another empty Mono with Mono.justOrEmpty(); this method creates Mono with the value passed or creates an empty Mono if null is passed as a value.

On comment (4), we created Mono with the same factory method, but this time with a String value passed.

On comment (5), we created Mono with the help of the toMono extension function.

Here is the output of the program:

So, as you have learned about Spring and you also learned about reactive programming with Reactor; would you like to do some research yourself and make our API reactive? As a helping gesture, I would like to suggest that you study a little bit about WebFlux. You can also read through Reactive Programming in Spring 5.0 by Oleh Dokuka and Igor Lozynskyi (https://www.packtpub.com/application-development/reactive-programming-spring-50).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.26.138