Rector Context

Another key feature that comes with Reactor is Context. Context is an interface that is passed along the stream. The central idea of the Context interface is providing access to some contextual information that may be useful to access later at runtime phase. We may wonder why do we need such feature if we have ThreadLocal that allows doing the same things. For example, many frameworks uses ThreadLocal in order to pass SecurityContext along the users requests execution and making it possible to access the authorized user at any point of processing. Unfortunately, such concept works well only when we have a single-threaded processing, so the execution is attache to same Thread. If we start using that concept with the asynchronous processing, the ThreadLocal will be loosed very fast. For instance, if we have execution as the following, then we will lose the available ThreadLocal:

class ThreadLocalProblemShowcase {                             

   public static void main(String[] args) {
      ThreadLocal<Map<Object, Object>> threadLocal =           // (1)
         new ThreadLocal<>();                                  //
      threadLocal.set(new HashMap<>());                        // (1.1)

      Flux                                                     // (2)
         .range(0, 10)                                         // (2.1)
         .doOnNext(k ->                                        //
            threadLocal                                        //
               .get()                                          //
               .put(k, new Random(k).nextGaussian())           // (2.2)
         )                                                     //
         .publishOn(Schedulers.parallel())                     // (2.3)
         .map(k -> threadLocal.get().get(k))                   // (2.4)
         .blockLast();                                         //
   }
}

The following is the description to the preceding code:

  1. At that point, we have a declaration of the ThreadLocal instance. In addition, at point (1.1), we have a setup of that ThreadLocal, so we may use it later in the code.
  2. Here we have the Flux stream declaration, that generate a range of elements from 0 to 9 (2.1). In addition, for each new element in the stream we generate a randomGaussian double, where the element is the seed for generated random value. Once the number is generated, we put it in the store in ThreadLocal map. Then, at point (2.3) we move execution to a different Thread. Finally, at point (2.4) we map the number in the stream to previously stored in the ThreadLocal map random gaussian double. At this point, we will get NullPointerException because the previously stored map in Thread Main is unavailable in a different Thread.

As we may notice from the preceding example, the usage of ThreadLocal in multi-threading environment is very dangerous and may cause unpredictable behavior. Even though Java API allows to transfer ThreadLocal data from Thread to Thread, it does not guaranty its consistent transferring everywhere.

Fortunately, Reactor Context solves that problem in the following way:

Flux.range(0, 10)                                               //
    .flatMap(k ->                                               //
       Mono.subscriberContext()                                 // (1)
           .doOnNext(context -> {                               // (1.1)
              Map<Object, Object> map = context.get("randoms"); // (1.2)
              map.put(k, new Random(k).nextGaussian());         //
           })                                                   //
           .thenReturn(k)                                       // (1.3)
    )                                                           //
    .publishOn(Schedulers.parallel())                           //
    .flatMap(k ->                                               //
       Mono.subscriberContext()                                 // (2)
           .map(context -> {                                    //
              Map<Object, Object> map = context.get("randoms"); // (2.1)
              return map.get(k);                                // (2.2)
           })                                                   //
    )                                                           //
    .subscriberContext(context ->                               // (3)
       context.put("randoms", new HashMap())                    //
    )                                                           //
    .blockLast();                                               //

The following is a description to the preceding code:

  1. At this point is an example of how we may access the Reactor's Context. As we may see, Reactor provides an access to the instance of Context in the current stream using static operator subscriberContext. As in the previous sample, once Context is achieved (1.1), we access to stored Map (1.2) and put generated value there. Finally, we return the initial parameter of flatMap.
  2. Here, we access Reactor's Context once again, after we switched the Thread. Even though this sample is identical to the previous sample where we used ThreadLocal, at point (2.1) we will successfully retrieve the stored map and get generated random gaussian double(2.2).
  3. Finally, here, in order to make "randoms", key returns a Map we populate upstream with a new Context instance that contains Map under the required key.

What we may see from the preceding example is that Context is accessible over the no-arguments Mono.subscriberContext operator and can be provided to the stream using the one-argument subscriberContext(Context) operator.

Looking at the preceding sample, we may wonder, why do we need to use Map in order to transfer data, since the Context interface has similar method that the Map interface has. By its nature, Context designed as an immutable object and once we add new element to it, we achieve new instance of Context. Such design decision was made in favor of multi-threading access model which. That means, that the only way to provide Context to the stream and dynamically provide some data, that will be available during the whole runtime execution in during the assemble or subscription time. In case, if Context is provided during the assemble time, then all subscribers will have share the same static context which may not useful in cases where each Subscriber (which may represent user connection) should have its own Context. Therefore, the only life-cycle period when each Subscriber may be provided with its context is the subscription time.

As we may remember from the previous sections. During the subscription time Subscriber hoisting from the bottom of the stream to the top through the chain of Publishers and become wrapped on each stage to a local Subscriber representation that introduce additional runtime logic. In order to keep that process unchanged and allows passing an additional Context object through the stream, Reactor uses specific extension of Subscriber interface called CoreSubscriber. CoreSubscriber allows transferring Context as its field. The following shows how the CoreSubscriber interface looks:

interface CoreSubscriber<T> extends Subscriber<T> {
   default Context currentContext() {
      return Context.empty();
   }
}

As we can see from the preceding code, CoreSubscriber introduces an additional method called currentContext. Which gives an access to the current Context object. Most of the operators in Project Reactor, provides an implementation of the CoreSubscriber interface with referencing to the downstream Context. As we may notice, the only operator that allows modifying current Context is subscriberContext, which implementation of CoreSubscriber is held merged Context of downstream and the passed as a parameter.

Also, such a behavior means that an accessible Context object may be different at the different point in the stream. For example, the following shows the mentioned behavior:

void run() {
   printCurrentContext("top")
   .subscriberContext(Context.of("top", "context"))
   .flatMap(__ -> printCurrentContext("middle"))
   .subscriberContext(Context.of("middle", "context"))
   .flatMap(__ -> printCurrentContext("bottom"))
   .subscriberContext(Context.of("bottom", "context"))
   .flatMap(__ -> printCurrentContext("initial"))
   .block();
}
void print(String id, Context context) {
   ...
}
Mono<Context> printCurrentContext(String id) {
   return Mono
      .subscriberContext()
      .doOnNext(context -> print(id, context));
}

The preceding code shows how we may use Context during the stream construction. If we run the mentioned code, the following result will appear in the console:

top {
  Context3{bottom=context, middle=context, top=context}
}

middle {
  Context2{bottom=context, middle=context}
}

bottom {
  Context1{bottom=context}
}

initial {
  Context0{}
}

As we may see from the preceding code, the available Context at the top of the stream contains the whole Context available in this stream, where the middle can access only to those Context, which is defined down the stream, and the context consumer at the very bottom (with id initial) has empty Context at all.

In general, Context is a killer feature that push Project Reactor on the next level of tools for building reactive systems. In addition, such feature is useful for many cases where we need to access some contextual data, for instance, at the middle of processing users request. As we are going to see in Chapter 6WebFlux Async Non-Blocking Communication, this feature is extensively used within Spring Framework, especially within reactive Spring Security.

Even though we covered Context feature extensively, there is a huge possibilities and use-cases for this Reactor's technique. To learn more about Reactor's Context, please see the following section of Project Reactor documentation: http://projectreactor.io/docs/core/release/reference/#context.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.0.85