PublishSubject

One of the most straightforward Subject instance, is PublishSubject. The way PublishSubject works is that it just relays all the elements received to its subscribers. It can work as a message bus between different peers.

That's explained best by an example. Consider the following code:

Subject<Long> subject = PublishSubject.create();

Observable.interval(2, TimeUnit.SECONDS)
.take(5)
.doOnSubscribe((d) -> log("Original-doOnSubscribe"))
.doOnComplete(() -> log("Original-doOnComplete"))
.subscribe(subject);

subject
.doOnSubscribe((d) -> log("First-doOnSubscribe"))
.doOnComplete(() -> log("First-doOnComplete"))
.subscribe(v -> log("First:" + v));

Thread.sleep(4100);

subject
.doOnSubscribe((d) -> log("Second-doOnSubscribe"))
.doOnComplete(() -> log("Second-doOnComplete"))
.subscribe(v -> log("Second: " + v));

Here, we create a PublishSubject and make it receive five items from this:

Observable.interval(2, TimeUnit.SECONDS)

Next, we immediately start listening to changes to the PublishSubject by subscribing to it using the following:

subject
.doOnSubscribe((d) -> log("First-doOnSubscribe"))
.doOnComplete(() -> log("First-doOnComplete"))
.subscribe(v -> log("First:" + v));

Afterward, we wait for four seconds with this:

Thread.sleep(4100);

Then, we start the second subscription with the following:

subject
.doOnSubscribe((d) -> log("Second-doOnSubscribe"))
.doOnComplete(() -> log("Second-doOnComplete"))
.subscribe(v -> log("Second: " + v));

Also, we have added these calls to better understand the way the flow works:

        .doOnSubscribe((d) -> log("..."))
.doOnComplete(() -> log("..."))

So, what's the output? Take a quick look at the following:

main:Original-doOnSubscribe
main:First-doOnSubscribe
RxComputationThreadPool-1:First: 0
RxComputationThreadPool-1:First: 1
main:Second-doOnSubscribe
RxComputationThreadPool-1:First: 2
RxComputationThreadPool-1:Second: 2
RxComputationThreadPool-1:First: 3
RxComputationThreadPool-1:Second: 3
RxComputationThreadPool-1:First: 4
RxComputationThreadPool-1:Second: 4
RxComputationThreadPool-1:Original-doOnComplete
RxComputationThreadPool-1:First-doOnComplete
RxComputationThreadPool-1:Second-doOnComplete

We can see that before the second subscription started, only the first one was emitting values:

RxComputationThreadPool-1:First: 0
RxComputationThreadPool-1:First: 1

The first value for the second subscription is this:

main:Second-doOnSubscribe
[...]
RxComputationThreadPool-1:Second: 2

It means that the first two values 0 and 1, because it wasn't yet subscribed. As we will see later, that's not always the case with other Subjects.

Next, the second subscription consumed the following values the same way as the first one, and they all completed at the same time, as seen from this:

RxComputationThreadPool-1:Original-doOnComplete
RxComputationThreadPool-1:First-doOnComplete
RxComputationThreadPool-1:Second-doOnComplete

As we can see from all this, the PublishSubject has a simple and straightforward behavior--it just keeps relying on values that it receives, and it will complete when the source of those values completes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.138.202