AsyncSubject

The last on the list is the AsyncSubject. It works entirely differently from the previous Subjects because it only emits a single value that it has received from upstream Observables. What it does is that it waits for the upstream Observable to complete, and only then it will emit an item to its subscribers.

Again, let's look at an example:

Subject<String> subject = AsyncSubject.create();

Observable.interval(0, 1, TimeUnit.SECONDS)
.take(4)
.map(Objects::toString)
.subscribe(subject);

subject.subscribe(v -> log(v));

Here, the upstream Observable is created by this:

Observable.interval(0, 1, TimeUnit.SECONDS)
.take(4)
.map(Objects::toString)

It will issue four items, and it will then complete. After this executes, we will see just this:

RxComputationThreadPool-1:3

It is the last value that was emitted from the source Observable before it is completed. Let's now consider that we subscribe twice to AsyncSubject, as shown here:

Subject<String> subject = AsyncSubject.create();

Observable.interval(0, 1, TimeUnit.SECONDS)
.take(4)
.map(Objects::toString)
.subscribe(subject);

subject.subscribe(v -> log(v));

Thread.sleep(5100);

subject.subscribe(v -> log(v));

The output will be as following because the second subscription basically completed immediately and returned the last stored-value in the AsyncSubject:

RxComputationThreadPool-1:3
main:3

Developers will find AsyncSubject useful in places they need to wait for a background operation to complete and then use (and reuse) that value in the future. One example that comes to the mind right away is the loading of categories--you initiate loading and wait until the Subject has the list of categories. Afterward, all the future subscribers will immediately receive the stored-values.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.82.21