Implementing custom subscribers

If default subscribe(...) methods do not provide the required versatility, we may implement our own SubscriberWe can always directly implement the Subscriber interface from the Reactive Streams specification and subscribe it to the stream, as follows:

Subscriber<String> subscriber = new Subscriber<String>() {
   volatile Subscription subscription;                             // (1)

   public void onSubscribe(Subscription s) {                       // (2)
      subscription = s;                                            // (2.1)
log.info("initial request for 1 element"); // subscription.request(1); // (2.2) } public void onNext(String s) { // (3) log.info("onNext: {}", s); //
log.info("requesting 1 more element"); // subscription.request(1); // (3.1) } public void onComplete() { log.info("onComplete"); } public void onError(Throwable t) {
log.warn("onError: {}", t.getMessage());
} }; Flux<String> stream = Flux.just("Hello", "world", "!"); // (4) stream.subscribe(subscriber); // (5)

In our custom Subscriber implementation, we do the following:

  1. Our subscriber has to hold a reference to a Subscription that binds a Publisher and our Subscriber.  As subscription and data processing may happen in different threads, we use the volatile keyword to make sure that all threads will have the correct reference to the Subscription instance.
  2. When a subscription arrives, our Subscriber is informed with the onSubscribe callback. Here, we save the subscription (2.1) and request the initial demand (2.2). Without that request, a TCK complaint provider will not be allowed to send data and elements processing will not start at all.
  3. In the onNext callback, we log the received data and request the next element. In this case, we use a straightforward pull model (subscription.request(1)) for backpressure management.
  4. Here, we generate a simple stream with the just factory method.
  5. Here, we subscribe our custom subscriber to the Reactive Stream defined in step (4).

The preceding code should produce the following console output:

initial request for 1 element
onNext: Hello
requesting 1 more element
onNext: world
requesting 1 more element
onNext: !
requesting 1 more element
onComplete

However, the described approach for defining subscription is not right. It breaks the linear code flow and is also prone to errors. The hardest part is that we are required to manage backpressure on our own and correctly implement all TCK requirements for a subscriber. Moreover, in the preceding example, we broke a couple of TCK requirements regarding subscription validation and cancelation.

Instead, it is recommended to extend the BaseSubscriber class provided by Project Reactor. In this case, our subscriber may look as follows:

class MySubscriber<T> extends BaseSubscriber<T> {
   public void hookOnSubscribe(Subscription subscription) {
      log.info("initial request for 1 element");
      request(1);
   }

   public void hookOnNext(T value) {
      log.info("onNext: {}", value);
log.info("requesting 1 more element"); request(1); } }

Along with the hookOnSubscribe(Subscription) and hookOnNext(T) methods, we may override methods such as hookOnError(Throwable), hookOnCancel(), hookOnComplete(), and a handful of others. The BaseSubscriber class provides methods for granular control over the Reactive Stream demands with these methods—request(long) and requestUnbounded(). Also, with the BaseSubscriber class, it is much easier to implement TCK compliant subscribers. Such an approach may be desired when a subscriber itself holds precious resources with attentive life cycle management. For example, a subscriber may wrap a file handler or WebSocket connection to a third-party service.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.40.32