Buffering, throttling, and debouncing

Here is one interesting example:

Path path = Paths.get("src", "main", "resources");
Observable<String> data = CreateObservable
  .listFolder(path, "*")
  .flatMap(file -> {
    if (!Files.isDirectory(file)) {
      return CreateObservable
    .from(file)
    .subscribeOn(Schedulers.io());
  }
  return Observable.empty();
});
subscribePrint(data, "Too many lines");

This goes through all the files in a folder and reads all of them in parallel if they are not folders themselves. For the example, while I'm running it, there are five text files in the folder, and one of them is quite large. While printing the content of these files with our subscribePrint() method, we get something that looks like this:

Too many lines : Morbi nec nulla ipsum.
Too many lines : Proin eu tellus tortor.
Too many lines : Lorem ipsum dolor sit am
Error from Too many lines:
rx.exceptions.MissingBackpressureException
Too many lines : Vivamus non vulputate tellus, at faucibus nunc.
Too many lines : Ut tristique, orci eu
Too many lines : Aliquam egestas malesuada mi vitae semper.
Too many lines : Nam vitae consectetur risus, vitae congue risus.
Too many lines : Donec facilisis sollicitudin est non molestie.
  rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349)
  rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:721)
  rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:698)
  rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:586)
  rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)

The output is cropped, but the important thing is that we get this MissingBackpressureException exception.

The threads reading each of the files are trying to push their data into the merge() operator (the flatMap() operator is implemented as merge(map(func))). The operator is struggling with a large amount of data, so it will try to notify the overproducing Observable instances to slow down (this ability to notify the upstream that the amount of data can't be handled is called backpressure). The problem is that they don't implement such a mechanism (backpressure), so the MissingBackpressureException exception is encountered.

Dealing with such a situation is achieved through implementing backpressure into the upstream observables, using one of the special onBackpressure* methods or by trying to avoid it by packaging the large amount of incoming items into a smaller set of emissions. This packaging is done through buffering, dropping some of the incoming items, throttling (buffering using time intervals or events), and debouncing (buffering using the intervals between emissions of items).

Let's examine some of them.

Throttling

Using this mechanism, we can regulate the emission rate of an Observable instance. We can specify time intervals or another flow-controlling Observable instance to achieve this.

Using the sample() operator, we can control the emissions of an Observable instance using another one, or a time interval.

data = data
  .sample(
    Observable
      .interval(100L, TimeUnit.MILLISECONDS)
      .take(10)
      .concatWith(
        Observable
          .interval(200L, TimeUnit.MILLISECONDS)
      )
  );
subscribePrint(data, "Too many lines");

The sampling Observable instance emits every 100 milliseconds for the first two seconds and then begins emitting every 200 milliseconds. The data Observable instance drops all of its items until the sampling emits. When this happens, the last item emitted by the data Observable instance is passed through. So we have great data loss, but it's harder to encounter the MissingBackpressureException exception (it is possible to get it, though).

The sample() operator has two additional overloads to which you can pass time intervals, a TimeUnit metric and, optionally, a Scheduler instance:

data = data.sample(
  100L,
  TimeUnit.MILLISECONDS
);

Using the sample() operator with the Observable instance gives us more detailed control over the data flow. The throttleLast() operator is just an alias for the different versions of the sample() operator that receive the time interval. The throttleFirst() operator is the same as the throttleLast() operator, but the source Observable instance will emit the first item it emitted at the beginning of the interval, instead of the last. These operators are running on the computation scheduler by default.

These techniques are useful (as well as most of the others in this section) when you have multiple, similar events. For example, if you want to capture and react to mouse-move events, you don't need all the events, containing all the pixel positions; you need only some of them.

Debouncing

In our previous example, debouncing won't work. Its idea is to emit only items that are not followed by other items for a given time interval. Therefore, some time must pass between emissions in order to propagate something. Because all of the items in our data Observable instances are emitted seemingly at once, there is no interval between them to use. So we need to change the example a bit in order to demonstrate this.

Observable<Object> sampler = Observable.create(subscriber -> {
  try {
    subscriber.onNext(0);
    Thread.sleep(100L);
    subscriber.onNext(10);
    Thread.sleep(200L);
    subscriber.onNext(20);
    Thread.sleep(150L);
    subscriber.onCompleted();
  }
  catch (Exception e) {
    subscriber.onError(e);
  }
}).repeat()
  .subscribeOn(Schedulers.computation());
data = data
  .sample(sampler)
  .debounce(150L, TimeUnit.MILLISECONDS);

Here we are using the sample() operator with a special sampling Observable instance in order to reduce the emissions to occur on 100, 200, and 150 milliseconds. By using the repeat() operator, we create an infinite Observable instance, repeating the source, and set it to execute on the computation scheduler. Now we can use the debounce() operator to emit only this set of items with time gaps between their emissions of 150 or more milliseconds.

Debouncing, like throttling, can be used to filter similar events from an over-producing source. A good example of this is an auto-complete search. We don't want to trigger searches on every letter inputted by the user; we need to wait for him/her to stop typing and then trigger the search. We can use the debounce() operator for that and set a reasonable time interval. The debounce() operator has an overload that takes a Scheduler instance as its third argument. Additionally, there is one more overload with a selector returning an Observable instance for more fine-grained control over the data flow.

The buffer and window operators

These two sets of operators are transforming operators much like the map() or flatMap() operators. They transform a series of elements in a collection—a sequence of these elements to be emitted as one.

This book will not cover these operators in detail, but it's worth mentioning that the buffer() operator has overloads that are able to collect emissions based on time intervals, selectors, and other Observable instances. It can be configured to skip items too. Here is an example with the buffer(int count, int skip) method, a version of the buffer() operator that collects count items and skips skip items:

data = data.buffer(2, 3000);
Helpers.subscribePrint(data, "Too many lines");

This will output something similar to the following:

Too many lines : ["Lorem ipsum dolor sit amet, consectetur adipiscing elit.", "Donec facilisis sollicitudin est non molestie."]
Too many lines : ["Integer nec magna ac ex rhoncus imperdiet.", "Nullam pharetra iaculis sem."]
Too many lines : ["Integer nec magna ac ex rhoncus imperdiet.", "Nullam pharetra iaculis sem."]
Too many lines : ["Nam vitae consectetur risus, vitae congue risus.", "Donec facilisis sollicitudin est non molestie."]
Too many lines : ["Sed mollis facilisis rutrum.", "Proin enim risus, congue id eros at, pharetra consectetur ex."]
Too many lines ended!

The window() operator has exactly the same set of overloads as the buffer() operator. The difference is that instead of arrays of the buffered elements, the Observable instance created by the window() operator emits Observable instances emitting the collected elements.

In order to demonstrate a different overload, we'll present an example using the window(long timespan, long timeshift, TimeUnit units) method. This operator collects elements emitted within the timespan interval and skips all the elements emitted within the timeshift interval. This is repeated until the source Observable instance is complete.

data = data
  .window(3L, 200L, TimeUnit.MILLISECONDS)
  .flatMap(o -> o);
subscribePrint(data, "Too many lines");

We use the flatMap() operator to flatten the Observable instances. The result consists of all the items emitted in the first three milliseconds of the subscription, plus the ones emitted for three milliseconds after a 200-millisecond gap, and this is repeated while the source is emitting.

The backpressure operators

The last set of operators preventing the MissingBackpressureException exception actually activate automatically when there is an overproducing source Observable instance.

The onBackpressureBuffer() operator buffers the items emitted by the faster than its Observer instance's source Observable. The buffered items are then emitted in a way that the subscribers can handle them. For example:

Helpers.subscribePrint(
  data.onBackpressureBuffer(10000),
  "onBackpressureBuffer(int)"
);

Here we used a big capacity for the buffer because of the large number of elements, but note that overflowing this buffer will get the MissingBackpressureException exception back.

The onBackpressureDrop() operator drops all the incoming items from the source Observable instance that can not be handled by the subscribers.

There is a way to establish backpressure by implementing smart Observables or Subscribers, but this topic is beyond the scope of this book. There is an excellent article about backpressure and observables on the RxJava wiki page—https://github.com/ReactiveX/RxJava/wiki/Backpressure. Many of the operators mentioned in this section are described there in depth, and there are marble diagrams available to help you understand the more complex ones.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.37.10