Monitoring reactor schedulers

Because reactive streams usually operate on different reactor schedulers, it may be beneficial to track fine-grained metrics regarding their operation. In some cases, it is plausible to use a custom ScheduledThreadPoolExecutor with a manual metric instrumentation. For example, let's assume we have the following class:

public class MeteredScheduledThreadPoolExecutor
       extends ScheduledThreadPoolExecutor {                         // (1)

   public MeteredScheduledThreadPoolExecutorShort(
      int corePoolSize,
      MeterRegistry registry                                         // (2)
   ) {
      super(corePoolSize);
      registry.gauge("pool.core.size", this.getCorePoolSize());      // (3)
      registry.gauge("pool.active.tasks", this.getActiveCount());    //
      registry.gauge("pool.queue.size", this.getQueue().size());     //
   }
}

Here, the MeteredScheduledThreadPoolExecutor class extends an ordinary ScheduledThreadPoolExecutor (1) and additionally receives a MeterRegistry instance (2). In the constructor, we register a few gauges (3) to track the core size of the thread pool, the number of active tasks, and the queue size.

With a small modification, such executors can also track the number of successful and failed tasks as well as time spent executing these tasks. For such purposes, an executor's implementation  should also override beforeExecute() and afterExecute() methods.

Now we may use such an instrumented executor in the following reactive stream:

MeterRegistry meterRegistry = this.getRegistry();

ScheduledExecutorService executor =
    new MeteredScheduledThreadPoolExecutor(3, meterRegistry);

Scheduler eventsScheduler = Schedulers.fromExecutor(executor);

Mono.fromCallable(this::businessOperation)
    .subscribeOn(eventsScheduler)
    ....

Even though such an approach is pretty powerful, it does not cover built-in Schedulers like parallel() or elastic(). To instrument all reactor schedulers, we may use a custom Schedulers.Factory. For example, let's look at the following implementation of such a factory:

class MetersSchedulersFactory implements Schedulers.Factory {      // (1)
   private final MeterRegistry registry;

   public ScheduledExecutorService decorateExecutorService(        // (2)
      String type,                                                 // (2.1)
      Supplier<? extends ScheduledExecutorService> actual          // (2.2)
   ) {
      ScheduledExecutorService actualScheduler = actual.get();     // (3)
      String metric = "scheduler." + type + ".execution";          // (4)

      ScheduledExecutorService scheduledExecutorService =
                             new ScheduledExecutorService() {      // (5)
         public void execute(Runnable command) {                   // (6)
            registry.counter(metric, "tag", "execute")             // (6.1)
                    .increment();
            actualScheduler.execute(command);                      // (6.2)
         }

         public <T> Future<T> submit(Callable<T> task) {           // (7)
            registry.counter(metric, "tag", "submit")              // (7.1)
                    .increment();
            return actualScheduler.submit(task);                   // (7.2)
         }

         // other method overrides ...
      };

      registry.counter("scheduler." + type + ".instances")         // (8)
              .increment();
      return scheduledExecutorService;                             // (9)
   }
}

The preceding code is explained as the following:

  1. This is a custom implementation of the reactor's Schedulers.Factory class.
  2. Here, we have a declaration of the method that allows decorating a scheduled executor service where type (2.1) represents a type of a scheduler (parallel, elastic) and actual (2.2) holds a reference to the actual service to be decorated.
  3. At this point, there is an extraction of the actual executor service instance, as it is used in a decorator.
  4. This is the definition of the counter name that includes the type of Scheduler to increase metrics' readability.
  5. This is the declaration, the anonymous ScheduledExecutorService instance, that decorates the actual ScheduledExecutorService and provides additional behavior.
  6. This is the overridden execute(Runnable command) method, which executes MeterRegistry#counter with additional execute tag (6.1) and increments the number of collected metrics by one. Then, the delegate calls the actual method's implementation (6.2).
  7. Similar to the execute method, we override the Future<T> submit(Callable<T> task) method here. With each method's execution, call the MeterRegistry#counter method with an additional submit tag and finally increment the number of calls in the registry (7.1). After all actions, we delegate the method execution to the actual service (7.2).
  8. This is a declaration of the registering the count of created instances of the executor service.
  9. At this point, we finally return the decorated ScheduledExecutorService instance with an additional Micrometer metrics processing.

To use the MetersSchedulersFactory, we have to register the factory as follows:

Schedulers.setFactory(new MeteredSchedulersFactory(meterRegistry));

Now all Reactor schedulers (including custom schedulers defined on MeteredScheduledThreadPoolExecutor) are metered with Micrometer metrics. Such an approach gives plenty of flexibility but requires some discipline to prevent metrics gathering from becoming the most resource-demanding part of an application. For example, even though a meter (counter/timer/gauge) lookup in a meter registry should be fast, it is better to save a retrieved reference to a local variable or an instance field to reduce redundant lookups. Also, not all aspects of the application execution deserve a dedicated set of metrics. Usually, an application domain and using our common sense helps to select the essential application characteristics to monitor.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.27.131