Creating Flux and Mono sequences

Flux and Mono provide many factory methods to create Reactive Streams based on data that is already available. For example, we may create Flux with object references or from a collection, or we may even create our own lazy range of numbers:

Flux<String>  stream1 = Flux.just("Hello", "world");
Flux<Integer> stream2 = Flux.fromArray(new Integer[]{1, 2, 3});
Flux<Integer> stream3 = Flux.fromIterable(Arrays.asList(9, 8, 7));

It is easy to generate a stream of integers with the range method, where 2010 is a starting point, and 9 is the number of elements in the sequence:

Flux<Integer> stream4 = Flux.range(2010, 9);

This is a handy way to generate a stream of recent years, so the preceding code generates the following stream of integers:

2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018

Mono provides similar factory methods, but mainly targets one element. It is also often used in conjunction with nullable and Optional types:

Mono<String> stream5 = Mono.just("One");
Mono<String> stream6 = Mono.justOrEmpty(null);
Mono<String> stream7 = Mono.justOrEmpty(Optional.empty());

Mono may be very useful for wrapping asynchronous operations such as HTTP requests or DB queries. For this purpose, Mono provides these methods—fromCallable(Callable), fromRunnable(Runnable), fromSupplier(Supplier), fromFuture(CompletableFuture), fromCompletionStage(CompletionStage), and others. We can wrap long HTTP requests in Mono with the following line of code:

Mono<String> stream8 = Mono.fromCallable(() -> httpRequest());

Alternatively, we can rewrite the preceding code to be even shorter with the Java 8 method reference syntax:

Mono<String> stream8 = Mono.fromCallable(this::httpRequest);

Note that the preceding code not only makes an HTTP request asynchronously (provided with an appropriate Scheduler) but also handles errors that may be propagated as the onError signal.

Both Flux and Mono allow the adaptation of any other Publisher instance with the from(Publisher<T> p) factory method.

Both reactive types have methods for creating handy and commonly used empty streams, as well as streams containing only an error:

Flux<String> empty = Flux.empty();
Flux<String> never = Flux.never(); Mono<String> error = Mono.error(new RuntimeException("Unknown id"));

Both Flux and Mono have factory methods called empty(), which generate empty instances of Flux or Mono respectively. Similarly, the never() method creates a stream that never signals completion, data, or error.

The error(Throwable) factory method creates a sequence that always propagates an error through the onError(...) method of each subscriber when it subscribes. The error is created during the Flux or Mono declaration and, consequently, each subscriber receives the same Throwable instance.

The defer factory method creates a sequence that decides its behavior at the moment of subscription and, consequently, may generate different data for different subscribers:

Mono<User> requestUserData(String sessionId) {
   return Mono.defer(() ->
      isValidSession(sessionId)
         ? Mono.fromCallable(() -> requestUser(sessionId))
         : Mono.error(new RuntimeException("Invalid user session")));
}

This code defers sessionId validation until the actual subscription happens. In contrast, the following code carries out validation when the requestUserData(...) method is called, which may be way before the actual subscription (also, no subscription may happen at all):

Mono<User> requestUserData(String sessionId) {
   return isValidSession(sessionId)
      ? Mono.fromCallable(() -> requestUser(sessionId))
      : Mono.error(new RuntimeException("Invalid user session"));
}

The first example validates the session each time someone subscribes to the returned Mono<User>. The second example carries out session validation, but only when the requestUserData method is called. However, no validation happens when a subscription does.

Summing this up, Project Reactor allows the creation of Flux and Mono sequences just by enumerating elements with the just method. We may easily wrap Optional into Mono with  justOrEmpty, or wrap Supplier into Mono with the fromSupplier method. We may map Future with the fromFuture method or Runnable with the fromRunnable factory method.  Also, we may translate an array or an Iterable collection to the Flux stream with the fromArray or fromIterable methods. As well as this, Project Reactor allows the creation of more complicated reactive sequences, which we are going to cover later in the chapter. Now, let's learn how to consume elements produced by a reactive stream.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.66.94