Blocking subscribers

Try to remember the code blocks from previous chapters, where we used delay to make the main thread wait whenever we used an Observable or Flowable that operates on a different thread. A perfect example of this scenario is when we used Observable.interval as a factory method or when we used the subscribeOn operator. To get you refreshed, following is such a code example:

    fun main(args: Array<String>) { 
      Observable.range(1,10) 
         .subscribeOn(Schedulers.computation()) 
         .subscribe { 
            item -> println("Received $item") 
          } 
      runBlocking { delay(10) } 
    } 

In this example, we switched to Schedulers.computation for the subscription. Now let's see, how we can test this Observable and check that we received exactly 10 emissions:

    @Test 
    fun `check emissions count` () { 
      val emissionsCount = AtomicInteger()//(1) 
      Observable.range(1,10) 
         .subscribeOn(Schedulers.computation()) 
         .blockingSubscribe {//(2) 
          _ -> emissionsCount.incrementAndGet() 
         } 
 
        assertEquals(10,emissionsCount.get())//(3) 
    } 

Let's have a look at the testing result first before digging into the code:

There are a few things that need explanations in this code. The first one is AtomicInteger. AtomicInteger is a wrapper around integer in Java, that allows an Int value to be updated atomically. Though AtomicInteger extends Number to allow uniform access by tools and utilities that deal with numerically-based classes, it cannot be used as a replacement of Integer. We used AtomicInteger in this code to ensure atomicity, as the subscription was running in the computationScheduler (thus in multiple threads).

The line, that demands our attention is where we put comment (2). We used blockingSubscribe instead of just subscribe. When we subscribe to a producer with the subscribe operator and the subscription is not in the current thread, the current thread doesn't wait for the subscription to complete and moves to the next line instantly. That's why we used delay to make the current thread wait. Using delay inside tests is troublesome. While blockingSubscribe blocks the current running thread until the subscription finishes up (even if the subscription occurs in a separate thread), that is useful while writing tests.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.128.198.59