Understanding AsyncSubject

AsyncSubject only emits the last value of the source observable (Observable it listens on), and the last emission only. To say things more clearly, AsyncSubject will emit the last value it got, and will emit it only one time.

This is a marble diagram for AsyncSubject, which has been taken from ReactiveX documentation (http://reactivex.io/documentation/subject.html):

Let's consider the following code example:

    fun main(args: Array<String>) { 
      val observable = Observable.just(1,2,3,4)//1 
      val subject = AsyncSubject.create<Int>()//2 
      observable.subscribe(subject)//3 
      subject.subscribe({//4 
        //onNext 
        println("Received $it") 
      },{ 
        //onError 
        it.printStackTrace() 
      },{ 
        //onComplete 
        println("Complete") 
      }) 
      subject.onComplete()//5 
    } 

Here is the output:

Received 4
Complete

In this example, we created an example with Observable.just, with 4 integers (on comment 1). Then, on comment 2, we created an AsyncSubject example. After that, on comment 3 and 4, like the previous example, we subscribed to the observable instance with subject and then subscribed to the Subject instance with lambda; only this time, we passed all the three methods—onNext, onError, and onComplete.

On comment 6, we called onComplete.

As the output suggests, Subject only emitted the last value it got, that is, 4.

On Subject instances, you can pass values directly with the onNext method, without subscribing to any Observable. Recall the examples in the previous chapters where we used Subject (PublishSubject); there, we only used onNext to pass the values. You can subscribe to another Observable with Subject, or pass values with onNext. Basically, when you subscribe to Observable with SubjectSubject calls its onNext internally upon Observable's value emission.

Have doubts? Let's tweak the code a little. Instead of subscribing to an Observable, we will call onNext only to pass values, and will have another subscription. Here is the code, to do so:

    fun main(args: Array<String>) { 
      val subject = AsyncSubject.create<Int>() 
      subject.onNext(1) 
      subject.onNext(2) 
      subject.onNext(3) 
      subject.onNext(4) 
      subject.subscribe({ 
        //onNext 
        println("S1 Received $it") 
      },{ 
        //onError 
        it.printStackTrace() 
      },{ 
        //onComplete 
        println("S1 Complete") 
      }) 
      subject.onNext(5) 
      subject.subscribe({ 
        //onNext 
        println("S2 Received $it") 
      },{ 
        //onError 
        it.printStackTrace() 
      },{ 
        //onComplete 
        println("S2 Complete") 
      }) 
      subject.onComplete() 
    } 

Here is the output:

Here, we passed all values via onNext; it only emitted the last value it got (5) to both of the subscriptions. Look carefully, the 1st subscription was before passing the last value. As ConnectableObservable starts emitting on call of connect, AsyncSubject emits its only value on call of onComplete only.

Note that as the outputs suggest, AsyncSubject doesn't in an interleave manner, that is, it will replay its action multiple times to emit the value to multiple Observers, although it is only one value.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.214.27