Introducing Reactor types

We've mentioned Reactive Streams with little detail. There is a spec for Reactive Streams (http://www.reactive-streams.org/), but it's important to understand that it is quite primitive. In fact, it's so primitive that it's not very effective for building applications. That may sound counterintuitive, but it wasn't written so much for end users as it was for framework developers. To build reactive applications, we'll use Project Reactor (http://projectreactor.io/), the core library that Spring Framework 5 uses for its reactive programming model.

To introduce Reactor's core types, we'll begin with the one we just saw in the previous section, Flux, and some code like this:

    Flux.just("alpha", "bravo", "charlie"); 

This simple creation of a Reactor Flux can be detailed as follows:

  • Flux is Reactor's base type, a container holding 0..N items, none of which will be reached until the client calls the reactive stream's subscribe() method. In this case, the container holds a set of strings.
  • just() is a static helper method to construct a fixed collection. Other static helpers are also available, like fromArray(), fromIterable(), and fromStream(). This makes it easy to bridge existing Java collections.
There are additional methods to convert a Flux to a Java Stream and an Iterable. But since these types are generally blocking, it's best to avoid them if possible.

Exactly what does a Flux embody? How is it different from a Java List or Stream? A Flux keenly represents multiple values coming, in the future, asynchronously. When those values are coming is not specified nor can it be assumed they are all arriving on the same thread.

In the past, Java has made it possible to represent either a single value or a collection of values that are coming right now in synchronous, blocking APIs. We've also had single value types for asynchronous values (Future and CompletableFuture). But Java has yet to create a value type for multiple, asynchronous values. That is what Project Reactor and Reactive Streams is all about--processing multiple, asynchronous, non-blocking values in a cohesive fashion.

To consume a Flux, we have to either subscribe or let the framework do it for us. Here's an example of subscribing for the results:

    Flux.just("alpha", "bravo", "charlie") 
     .subscribe(System.out::println); 

This last code creates a Flux with three items, subscribes for the results, and prints each value out to the screen as follows:

alpha
bravo
charlie  

This may not appear impressive, especially when compared to the existing Java collection builders like Arrays.asList("alpha", "bravo", "charlie"). Looks the same, right?

A difference can be seen when we start leveraging Java 8 lambdas and function types. That's when we can chain together a series of function calls, all of which are delayed until that exact element is extracted. Look at the following fragment:

    Flux.just( 
      (Supplier<String>) () -> "alpha", 
      (Supplier<String>) () -> "bravo", 
      (Supplier<String>) () -> "charlie") 
       .subscribe(supplier -> System.out.println(supplier.get())); 

This Flux contains the equivalent in values of our earlier Flux.just() except that each one is wrapped inside a Java 8 Supplier. This means that, actually, retrieving each value is delayed until subscription and only when each individual value is fetched through Reactor's onNext() method. This is also known as lazy.

Sure this example is contrived, but we'll see more of this paradigm as we explore reactive programming throughout this book.

Another facet of Project Reactor is over 160 operations rooted in functional programming including some of the most well known ones such as map, flatMap, filter, and then.

To wrap up this section, let's pick an example a little more complex in nature. What if we took the sample data that we have been poking at and count up how many of each letter we have. Check it out:

    Flux.just("alpha", "bravo", "charlie") 
      .map(String::toUpperCase) 
      .flatMap(s -> Flux.fromArray(s.split(""))) 
      .groupBy(String::toString) 
      .sort((o1, o2) -> o1.key().compareTo(o2.key())) 
      .flatMap(group -> Mono.just(group.key()).and(group.count())) 
      .map(keyAndCount -> 
        keyAndCount.getT1() + " => " + keyAndCount.getT2()) 
        .subscribe(System.out::println); 

We can take apart this preceding flow as follows:

  • This flow starts with the same values as shown earlier in this chapter, alpha, bravo, and charlie bundled into a Reactor Flux.
  • Each entry is converted to uppercase using String::toUpperCase ensuring we'll count lowers and uppers together.
  • The entries are then flatMapped into individual letters. To visualize flatMapping, look at this example--["alpha", "bravo"] is mapped by s.split("") into a collection of collections, [["a", "l", "p", "h", "a"], ["b", "r", "a", "v", "o"]], and then flattened into a single collection, ["a", "l", "p", "h", "a", "b", "r", "a", "v", "o"].
  • Then we group by the string value, which will combine all the "a" entries into one subgroup, and so on and so forth.
  • Next, we sort by the key value, because the group type doesn't implement Comparable.
The underlying type of groupBy() is a GroupedFlux, a Flux with a key value that doesn't implement Comparable.
  • We flatMap the group's key and count value into a pair of Mono objects. (More on Mono further in this chapter.)
  • We unpack the tuple, and convert it into a string showing key and count.
  • We subscribe to the entire flow, printing out the results.

The output can be seen as follows:

A => 4
B => 1
C => 1
E => 1
H => 2
I => 1
L => 2
O => 1
P => 1
R => 2
V => 1
Now that's a lot to take in all at once. Reactor flows, much like Java 8 streams, can pack a lot of functionality. But that is their key benefit. By spending little time on language ceremony, we, instead, focus on strong functional definitions. If needed, it can be handy to read each step in that flow again, using the bullet points to help decode it.

After chatting about Flux and all of its operations, something else has leaked into our code--Mono. What is that? It's a Reactor container for 0..1 items, a subset of Flux. It implements the same Reactive Streams interface, Publisher, which means that we only get its results when we invoke subscribe(). It has a few API differences from Flux like flatMap() versus flatMapMany(), but apart from that, it is not hard to grok.

It turns out, a lot of use cases involve handling single values, making it worthwhile capturing this type. In the flow we just walked through, it turns out that the count() of a group is stored in a Mono<Long>, indicating that we can't know the value until the subscribe is applied at the end. So we have to bundle it up along with the key and map over it to effectively unpack it.

Given that we just walked through a chain of Reactor operations, it's handy to review some of the most commonly used ones. Look at this quick guide:

Operation

Description

map()

Converts one Flux into another Flux of identical size using a function applied to each element

flatMap()

Converts one Flux into another Flux of a different size by first mapping, and then removing any nesting

filter()

Converts one Flux into a smaller Flux with elements removed based on a filtering function

groupBy()

Converts the Flux into a bundled set of subgroups based on the grouping function

sort()

Converts one Flux into a sorted Flux based on the sorting function

Several of these operations listed in the previous table also exist for Mono. There are others, but these are the big ones.

What's the big picture in all this? Essentially, every step of this flow could be an asynchronous, non-blocking, remote call to another service. With Reactor, we don't have to worry about thread management unless we really want to get into it. It's handled for us. And soon, we'll start doing just that.

There's a myth that is possibly as old as Java itself: To make things run faster, we must use threads. And the corollary would be: The more threads, the faster. But this is not born out of empirical research. In fact, using threads can lead to concurrent faults and using too many threads can introduce context switching overhead. JavaScript developers, in an environment where there is but one thread, have developed many reactive solutions that are very efficient at handling things. That is because using queues and event loops combined with asynchronous, non-blocking APIs that don't hold up the thread, actually results in accomplishing a lot with few resources.

If this introductory taste of Project Reactor, Flux, and Mono is still confusing, please read the following blog articles for more detailed information on reactive programming:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.30.62