Understanding backpressure

The only problem with Observable is when an Observer cannot cope with the pace of an Observable. An Observable, by default, chains work by pushing items synchronously to the Observer, one at a time. However, if the observer has to perform some time-consuming computations, this may take longer than the interval of each item emission of Observable. Confused? Let's consider this example:

    fun main(args: Array<String>) { 
      val observable = Observable.just(1,2,3,4,5,6,7,8,9)//(1) 
      val subject = BehaviorSubject.create<Int>() 
      subject.observeOn(Schedulers.computation())//(2) 
            .subscribe({//(3) 
                println("Subs 1 Received $it") 
                runBlocking { delay(200) }//(4) 
            }) 
 
            subject.observeOn(Schedulers.computation())//(5) 
            .subscribe({//(6) 
                println("Subs 2 Received $it") 
             }) 
             observable.subscribe(subject)//(7) 
            runBlocking { delay(2000) }//(8) 
    } 

The code is quite simple. We created Observable on comment (1), then, we created BehaviorSubject, and then, on comment (3) and (6), we subscribe to BehaviorSubject. On comment (7), after subscribing to BehaviorSubject, we will use BehaviorSubject to subscribe to the Observable so that Observers of BehaviorSubject should get all the emissions. On comment (4), inside the first subscription, we used the delay method to simulate a time-taking subscriber. There is a new code on comment (2) and (6), subject.observeOn(Schedulers.computation()); we will discuss this method in detail in the later chapters, but, for now, just keep in mind that this observeOn method helps us specify a thread to run the subscription, and Scheduler.computation() provides us a with a thread to perform computations. On comment (8), we used the delay method to wait for the execution, as the execution will occur in the background.

Based on the knowledge we gathered from previous chapters, we can easily say that subscriptions should print all the numbers from 1-9 in an interleaved manner, or shouldn't they? Let's see the output first:

Shocked to see the output? Instead of working in an interleaved manner, subscription 2 completes printing all the numbers before subscription 1 prints even the second number, even though it starts printing first. So, why did it break the behavior of Hot Observables? Why didn't both the Observers work in an interleaved manner? Let's inspect. The program actually didn't break the behavior of Hot Observables, the subject actually emitted once for both of the observers; however, as for the first observer, each computation took long, the emissions got queued; and this is obviously not any good, as this could lead to a lot of problems, including the OutOfMemoryError exceptions.

Still have doubts? Let's look at another example:

    fun main(args: Array<String>) { 
      val observable = Observable.just(1,2,3,4,5,6,7,8,9)//(1) 
      observable 
         .map { MyItem(it) }//(2) 
         .observeOn(Schedulers.computation())//(3) 
         .subscribe({//(4) 
           println("Received $it") 
           runBlocking { delay(200) }//(5) 
          }) 
          runBlocking { delay(2000) }//(6) 
    } 
 
    data class MyItem (val id:Int) { 
      init { 
        println("MyItem Created $id")//(7) 
      } 
    } 

In this example, we eliminated the Subject and multiple Subscribers to make the program simpler and easier to understand. We have already introduced the map operator in the previous chapter that we used on comment (2) to convert the Int items to the MyItem object.

If you forgot the map operator from the previous chapter, it takes a source observable, processes items emitted by them on runtime, and creates another observable to observe on. Put simply, the map operator sits before subscribe to process each item emitted by observable before passing the new generated item to observer. We will also take a closer look at the map operator in the later chapters.

Here, we used it to keep track of each emission. Whenever an emission will occur, it will be passed instantly to the map operator, where we are creating an object of the MyItem class. In the init block of the MyItem class, we are printing the value passed to it; so, as soon as an item is emitted, it will be printed by the MyItem class.

Here, the MyItem class is a data class, that is, it will have the getter of val id and toString methods by default.

The remaining part of the program is almost the same; let's take a look at the output, then we will continue to discuss:

As we can see in the output, the creation of many MyItem, as known as emissions was quite fast, and completed even before the Observer as known as consumer can even start printing.

So, the problem is that the emissions get queued in the consumer, while the consumer is busy processing previous emissions by the producer.

A solution to this problem could be a feedback channel from consumer to producer, through which the consumer can tell the producer to wait until it completes processing the previous emission. This way, consumers or messaging middleware will not become saturated and unresponsive under high load; instead, they may request fewer messages, letting the producer decide how to slow down. This feedback channel is called backpressure. Backpressure is not supported in Observables and Observers, the solution could be using Flowables and Subscribers instead. Let's learn what those are.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.80.101