© Fu Cheng 2018

Fu Cheng, Exploring Java 9, https://doi.org/10.1007/978-1-4842-3330-6_7

7. Reactive Streams

Fu Cheng

(1)Auckland, New Zealand

Reactive programming ( https://en.wikipedia.org/wiki/Reactive_programming ) has gained popularity in the Java community recently. In the Java world, we have popular reactive libraries like RxJava ( https://github.com/ReactiveX/RxJava ) and Reactor ( http://projectreactor.io/ ). Reactive Streams ( http://www.reactive-streams.org/ ) is an initiative to provide a standard for asynchronous stream processing with nonblocking back pressure. Core interfaces from the Reactive Streams specification have been added to Java 9 in the class java.util.concurrent.Flow.

Core Interfaces

This section covers the four core interfaces in Flow.

Flow.Publisher<T>

The interface Flow.Publisher<T> represents a producer of items to be received by subscribers. It only has one method, void subscribe(Flow.Subscriber<? super T> subscriber), which adds subscribers of type Flow.Subscriber. There are three types of notifications that a publisher can publish; see Table 7-1.

Table 7-1. Notifications of Publisher

Notification

Description

onNext

A new item has been published.

onComplete

No further notifications will be published.

onError

An error has been encountered when publishing items.

Both onComplete and onError are terminal notifications meaning that no more notifications will be published after an onComplete or onError notification. It’s valid for a publisher to not publish any onNext notifications. It’s also possible for a publisher to publish an infinite number of onNext notifications.

Flow.Subscriber<T>

The interface Flow.Subscriber<T> represents a receiver of notifications published by a Flow.Publisher. It has four methods; see Table 7-2.

Table 7-2. Methods of Flow.Subscriber

Method

Description

void onNext(T item)

Invoked when an item is received

void onComplete()

Invoked when an onComplete notification is received

void onError(Throwable throwable)

Invoked when an onError notification is received

void onSubscribe(Flow.Subscription subscription)

Invoked when the subscriber successfully subscribes to the publisher

If a subscriber cannot subscribe to a publisher, the method onError() is invoked with the error.

Flow.Subscription

The interface Flow.Subscriptionrepresents the subscription when a Flow.Subscriber successfully subscribes to a Flow.Publisher. It’s passed as an argument to the subscriber’s method onSubscribe(). Table 7-3 shows the methods of Flow.Subscription.

Table 7-3. Methods of Flow.Subscription

Method

Description

void request(long n)

Request n items to be published.

void cancel()

Cancel the subscription.

A subscriber only receives items after requesting them. The method request(long n) makes a demand of the publisher to indicate that the subscriber is ready to handle n items. This is important because it makes sure the subscriber is not overwhelmed by the publisher. The subscriber may receive fewer items than requested if the publisher terminates earlier. After the cancel() method is invoked, the subscriber may still receive additional notifications, but the subscription will be eventually cancelled.

Flow.Processor<T,R>

The interface Flow. Processorrepresents a component that acts as both a subscriber and a publisher. Listing 7-1 shows its definition. It simply extends from both Subscriber<T> and Publisher<R>.

Listing 7-1. Definition of Flow.Processor
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

SubmissionPublisher

Although the four core interfaces are easy to understand, it’s not easy to correctly implement them. However, you don’t actually need to care about the implementations; you can simply rely on third-party libraries.

The class java.util.concurrent.SubmissionPublisher<T> is an implementation of Flow.Publisher that asynchronously publishes submitted items to subscribers until it’s closed. The items of SubmissionPublisher are not generated by the publisher but are submitted by the client code. SubmissionPublisher can function as a bridge client code can use to communicate with subscribers.

SubmissionPublisher uses an Executor to send items to subscribers. The Executor can be provided in the constructor, or ForkJoinPool.commonPool() is used by default. When any subscriber method throws an error, you can provide a handler to handle the exception before the subscription is cancelled. SubmissionPublisher has three constructors.

  • The no-arg constructor uses the default Executor and no exception handler.

  • SubmissionPublisher(Executor executor, int maxBufferCapacity) configures the Executor and the maximum size of the buffer. The buffer contains submitted but not consumed items for a subscriber. Each subscriber has its own independent buffer.

  • SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler) configures the Executor, the maximum size of the buffer, and the exception handler.

Note

The actual value of the maximum buffer size may be rounded up to the nearest power of two or bounded by the largest value supported by the implementation. For example, if 10 is passed to the constructor as the value of maxBufferCapacity, the actual used value is 16. We can use the method getMaxBufferCapacity() to get the actual value.

Since subscribers may have different speeds at which they consume published items, it’s possible that some items have not yet been consumed by all subscribers. The method estimateMaximumLag() returns an estimate of the maximum number of items that have been published but have not yet been consumed by all subscribers. On the other hand, it’s also possible that the speed at which items are published cannot keep up with the consumption speed of the subscribers. In this case, the method estimateMinimumDemand() returns an estimate of the minimum number of items that have been requested but not yet published.

To submit items to SubmissionPublisher, you can use the submit() and offer() methods. The method submit(T item) publishes the item to all subscribers asynchronously by invoking their onNext() methods. submit() blocks uninterruptibly while there is no resource available for any subscriber to handle. You should use submit() when you want to make sure the items are published to all subscribers successfully. However, its blocking behavior may have a performance penalty. An overwhelmed subscriber may slow down the processing of other subscribers. The return value of submit() is the estimate of the maximum lag and is the same as the return value of the method estimateMaximumLag().

The methods int offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) and int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) also publish the items to all subscribers, but they allow items to be dropped when resources are not available for one or more subscribers. When an item is dropped, the provided handler onDrop is invoked with the current subscriber and the dropped item. The type of the handler is BiPredicate, so it returns a Boolean value. If the return value of the handler is true, the publisher will try to publish the item again. The return value of offer() indicates the result of publishing. If the value is negative, its absolute value represents the number of drops; otherwise, it represents an estimate of the maximum lag with the same meaning as the value submit() returned. If a timeout is added when offer() is invoked, it blocks while no resource is available for any subscriber and continues to block up to the specified timeout or until the caller thread is interrupted. The onDrop handler is invoked in both cases.

Table 7-4 shows other important methods of SubmissionPublisher.

Table 7-4. Other Methods of SubmissionPublisher

Method

Description

CompletableFuture<Void> consume(Consumer<? super T> consumer)

Processes all published items using the given consumer. The return value is a CompletableFuture<Void> that is completed normally when the publisher signals onComplete or completed exceptionally upon any error.

void close()

Closes the publisher and signals onComplete to all subscribers.

void closeExceptionally(Throwable error)

Closes the publisher and signals onError to all subscribers with the provided error.

boolean isClosed()

Checks if the publisher is closed.

List<Flow.Subscriber<? super T>> getSubscribers()

Returns a list of all subscribers.

int getNumberOfSubscribers()

Returns the number of subscribers.

boolean hasSubscribers()

Checks if the publisher has any subscriber.

boolean isSubscribed(Flow.Subscriber<? super T> subscriber)

Checks if the given subscriber is subscribed.

Now you can use SubmissionPublisher to publish some data. In Listing 7-2, the class PeriodicPublisher periodically publishes the given number of items and closes when all items are published. The constructor parameter Consumer<PeriodicPublisher<T>> action is used to publish the items. The method waitForCompletion() can be used to wait for all items to be published.

Listing 7-2. PeriodicPublisher
public class PeriodicPublisher<T> extends SubmissionPublisher<T> {

  private final ScheduledExecutorService scheduler;
  private final ScheduledFuture<?> periodicTask;
  private final AtomicInteger count = new AtomicInteger(0);
  private final CountDownLatch closeLatch = new CountDownLatch(1);


  public PeriodicPublisher(final Consumer<PeriodicPublisher<T>> action,
      final int maxBufferCapacity,
      final int total,
      final long period,
      final TimeUnit timeUnit) {
    super(ForkJoinPool.commonPool(),
        maxBufferCapacity,
        ((subscriber, throwable) ->
            System.out.printf("Publish error for %s: %s",
                subscriber, throwable)));
    this.scheduler = new ScheduledThreadPoolExecutor(1);
    this.periodicTask = this.scheduler.scheduleAtFixedRate(
        () -> {
          action.accept(this);
          final int value = this.count.incrementAndGet();
          if (value >= total) {
            this.doClose();
          }
        }, 0, period, timeUnit);
  }


  @Override
  public void close() {
    this.periodicTask.cancel(false);
    this.scheduler.shutdown();
    super.close();
  }


  public void waitForCompletion() {
    try {
      this.closeLatch.await();
    } catch (final InterruptedException e) {
      this.close();
    }
  }


  private void doClose() {
    try {
      this.close();
    } finally {
      this.closeLatch.countDown();
    }
  }
}

You can use PeriodicPublisher to create a random number generator; see Listing 7-3. The action passed in the constructor is publisher.submit(ThreadLocalRandom.current().nextLong(), which uses the method submit() to publish a random number.

Listing 7-3. Random Number Generator
public class RandomNumberGenerator extends PeriodicPublisher<Long> {

  public RandomNumberGenerator() {
    super((publisher) ->
            publisher.submit(ThreadLocalRandom.current().nextLong()),
        Flow.defaultBufferSize(),
        10,
        1,
        TimeUnit.SECONDS);
  }


  public static void main(final String[] args) {
    final RandomNumberGenerator generator = new RandomNumberGenerator();
    generator.subscribe(new Flow.Subscriber<>() {
      @Override
      public void onSubscribe(final Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
      }


      @Override
      public void onNext(final Long item) {
        System.out.printf("Received: %s%n", item);
      }


      @Override
      public void onError(final Throwable throwable) {
        throwable.printStackTrace();
      }


      @Override
      public void onComplete() {
        System.out.println("Completed!");
      }
    });
    generator.waitForCompletion();
  }
}

Now I am going to show you the difference between submit() and offer(). In Listing 7-4, I use three different methods to publish items. The DelayedSubscriber class delays 100ms before it processes each item. The publisher uses a buffer with a maximum size of 16. A total number of 50 sequence numbers are published with an interval of 50ms. I use waitForCompletion() to wait for publishing to complete, then wait another 5 seconds to allow buffered items to be processed. When I publish items using submit(), all items can be processed. When I publish items using offer() without a timeout, some of the items are dropped. This is because the buffer size is only 16 and it takes 100ms to process a single item. During that time, two items are generated. The buffer fills up before all items can be processed, and some of these items are dropped. The drop handler of offer() returns true, so the publisher tries to publish again. When you run the code, due to its concurrent nature, the dropped items may be different. When I publish items using offer() with a timeout of one second, because the timeout is much longer than the processing time, all items can be processed.

Listing 7-4. Differences Between submit() and offer()
public class DelayedSubscribers {

  public static void main(final String[] args) {
    final DelayedSubscribers delayedSubscribers = new DelayedSubscribers();
    delayedSubscribers.publishWithSubmit();
    System.out.println("===========");
    delayedSubscribers.publishWithOffer();
    System.out.println("===========");
    delayedSubscribers.publishWithOfferTimeout();
  }


  public void publishWithSubmit() {
    final SequenceGenerator sequenceGenerator = new SequenceGenerator();
    this.publish(publisher -> publisher.submit(sequenceGenerator.get()));
  }


  public void publishWithOffer() {
    final SequenceGenerator sequenceGenerator = new SequenceGenerator();
    this.publish(publisher -> publisher.offer(sequenceGenerator.get(),
        ((subscriber, value) -> {
          System.out.printf("%s dropped %s%n", subscriber, value);
          return true;
        })));
  }


  public void publishWithOfferTimeout() {
    final SequenceGenerator sequenceGenerator = new SequenceGenerator();
    this.publish(publisher ->
        publisher.offer(
            sequenceGenerator.get(),
            1000,
            TimeUnit.MILLISECONDS,
            ((subscriber, value) -> {
              System.out.printf("%s dropped %s%n", subscriber, value);
              return true;
            })
        ));
  }


  private void publish(final Consumer<PeriodicPublisher<Integer>> action) {
    final PeriodicPublisher<Integer> publisher =
        new PeriodicPublisher<>(
            action,
            16,
            50,
            50,
            TimeUnit.MILLISECONDS);
    publisher.subscribe(new DelayedSubscriber<>("1"));
    publisher.subscribe(new DelayedSubscriber<>("2"));
    publisher.subscribe(new DelayedSubscriber<>("3"));
    publisher.waitForCompletion();
    System.out.println("Publish completed");
    try {
      Thread.sleep(5000);
    } catch (final InterruptedException e) {
      e.printStackTrace();
    }
  }


  public static class SequenceGenerator implements Supplier<Integer> {

    private int count = 1;

    @Override
    public Integer get() {
      return this.count++;
    }
  }


  public static class DelayedSubscriber<T> implements Flow.Subscriber<T> {

    private final String id;
    private Flow.Subscription subscription;


    public DelayedSubscriber(final String id) {
      this.id = id;
    }


    @Override
    public void onSubscribe(final Flow.Subscription subscription) {
      this.subscription = subscription;
      System.out.printf("%s subscribed!%n", this.id);
      subscription.request(1);
    }


    @Override
    public void onNext(final T item) {
      this.subscription.request(1);
      try {
        Thread.sleep(100);
      } catch (final InterruptedException e) {
        e.printStackTrace();
      }
      System.out.printf("%s processed: %s%n", this.id, item);
    }


    @Override
    public void onError(final Throwable throwable) {
      throwable.printStackTrace();
    }


    @Override
    public void onComplete() {
      System.out.printf("%s completed!%n", this.id);
    }


    @Override
    public String toString() {
      return String.format("Subscriber %s", this.id);
    }
  }
}

Third-Party Libraries

If you have used RxJava or Reactor before, you’ll find out that by comparison, the interfaces provided by Flow are quite minimal. The Reactive Streams specification is designed to be minimal and it focuses on the interoperability between different libraries. You can integrate Flow with RxJava and Reactor.

RxJava 2

You can use the library RxJava2Jdk9Interop ( https://github.com/akarnokd/RxJava2Jdk9Interop ) for the conversion. Listing 7-5 shows you how to do the conversion between RxJava 2 Flowable and Flow.

Listing 7-5. RxJava 2 and Flow
import hu.akarnokd.rxjava2.interop.FlowInterop;
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;


public class RxJava2 {

  public void toFlow() {
    Flowable.interval(0, 50, TimeUnit.MILLISECONDS)
        .take(50)
        .to(FlowInterop.toFlow())
        .subscribe(new DelayedSubscribers.DelayedSubscriber<>("1"));
  }


  public void fromFlow() {
    final DelayedSubscribers.SequenceGenerator sequenceGenerator =
        new DelayedSubscribers.SequenceGenerator();
    final PeriodicPublisher<Integer> publisher =
        new PeriodicPublisher<>(
            pub -> pub.submit(sequenceGenerator.get()),
            16,
            50,
            50,
            TimeUnit.MILLISECONDS);
    FlowInterop.fromFlowPublisher(publisher)
        .map(v -> v * 10)
        .forEach(System.out::println);
  }


  public static void main(final String[] args) {
    final RxJava2 rxJava2 = new RxJava2();
    rxJava2.toFlow();
    rxJava2.fromFlow();
    try {
      Thread.sleep(10000);
    } catch (final InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Reactor

Reactor has a built-in adapter for Flow. Listing 7-6 shows how to do the conversion between Reactor Flux and Flow.

Listing 7-6. Reactor and Flow
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import reactor.adapter.JdkFlowAdapter;
import reactor.core.publisher.Flux;


public class Reactor {

  public void toFlow() {
    JdkFlowAdapter.publisherToFlowPublisher(
        Flux.interval(Duration.ZERO, Duration.ofMillis(50))
            .take(50)
    ).subscribe(new DelayedSubscribers.DelayedSubscriber<>("1"));
  }


  public void fromFlow() {
    final DelayedSubscribers.SequenceGenerator sequenceGenerator =
        new DelayedSubscribers.SequenceGenerator();
    final PeriodicPublisher<Integer> publisher =
        new PeriodicPublisher<>(
            pub -> pub.submit(sequenceGenerator.get()),
            16,
            50,
            50,
            TimeUnit.MILLISECONDS);
    JdkFlowAdapter.flowPublisherToFlux(publisher)
        .map(v -> v * 10)
        .subscribe(System.out::println);
  }


  public static void main(final String[] args) {
    final Reactor reactor = new Reactor();
    reactor.toFlow();
    reactor.fromFlow();
    try {
      Thread.sleep(10000);
    } catch (final InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Interoperability

Listing 7-7 shows you how to use Flow to convert an RxJava 2 Flowable to a Reactor Flux.

Listing 7-7. Interoperability between RxJava 2 and Reactor
import hu.akarnokd.rxjava2.interop.FlowInterop;
import io.reactivex.Flowable;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import reactor.adapter.JdkFlowAdapter;


public class RxInterop {

  public static void main(final String[] args) {
    final Flow.Publisher<Long> publisher =
        Flowable.interval(0, 50, TimeUnit.MILLISECONDS)
            .take(50)
            .to(FlowInterop.toFlow());
    JdkFlowAdapter.flowPublisherToFlux(publisher)
        .map(v -> v * 10)
        .toStream()
        .forEach(System.out::println);
  }
}

Summary

The introduction of the Reactive Streams specification to Java 9 is a big step toward embracing reactive programming principles in the Java platform. In this chapter, we discussed core interfaces in Flow and the built-in SubmissionPublisher. I also demonstrated the interoperability with the popular reactive libraries RxJava 2 and Reactor. In the next chapter, we’ll discuss variable handles.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.173.199