The Subject
instances are both Observable
instances and Observer
instances. Like Observable
instances, they can have multiple Observer
instances, receiving the same notifications. That's why they can be used to turn cold Observable
instances into hot ones. Like Observer
instances, they give us access to their onNext()
, onError()
, or onCompleted()
methods.
Let's look at an implementation of the preceding hot interval examples, using a Subject
instance:
Observable<Long> interval = Observable.interval(100L, TimeUnit.MILLISECONDS); // (1)
Subject<Long, Long> publishSubject = PublishSubject.create(); // (2)
interval.subscribe(publishSubject);
// (3)
Subscription sub1 = subscribePrint(publishSubject, "First");
Subscription sub2 = subscribePrint(publishSubject, "Second");
Subscription sub3 = null;
try {
Thread.sleep(300L);
publishSubject.onNext(555L); // (4)
sub3 = subscribePrint(publishSubject, "Third"); // (5)
Thread.sleep(500L);
}
catch (InterruptedException e) {}
sub1.unsubscribe(); // (6)
sub2.unsubscribe();
sub3.unsubscribe();
The example is slightly different now:
Observable
instance is created the same way as before.PublishSubject
instance—a Subject
instance that emits to an Observer
instance only those items that are emitted by the source Observable
instance subsequent to the time of the subscription. This behavior is similar to that of the ConnectableObservable
instance created by the publish()
method. The new Subject
instance is subscribed to the interval Observable
instance , created by the interval factory method, which is possible because the Subject
class implements the Observer
interface. Also, note that the Subject
signature has two generic types—one for the type of notifications the Subject
instance will receive and another for the type of the notifications it will emit. The PublishSubject
class has the same type for its input and output notifications.Note that it is possible to create a PublishSubject
instance without subscribing to a source Observable
instance. It will emit only the notifications passed to its onNext()
and onError()
methods and will complete when calling its onCompleted()
method.
Subject
instance; it is an Observable
instance after all.onCompleted()
method and close the notification stream.Subject
instance will continue emitting.This example's source code can be viewed/downloaded at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/SubjectsDemonstration.java.
There are four types of subjects that come with RxJava:
PublishSubject
: This is the one we saw in the previous example, behaving like ConnectableObservable
, created using the publish()
method.ReplaySubject
: This emits to any observer all of the items that were emitted by the source Observable
instance, regardless of when the observer subscribes. So, it behaves like ConnectableObservable
, created using the replay()
method. The ReplaySubject
class has many factory methods. The default one caches everything; keep this in mind, because it can eat up memory. There are factory methods for creating it with size-bound and/or time-bound buffers. As with the PublishSubject
class, this one can be used without a source Observable
instance. All of the notifications emitted using its onNext()
, onError()
, and onCompleted()
methods will be emitted to every Subscriber, even if it is subscribed after invoking the on*
methods.BehaviorSubject
: When an observer subscribes to it, it emits the item most recently emitted by the source Observable
instance (or a seed/default value if none have yet been emitted) and then continues to emit any other items emitted later by the source Observable
instance. The BehaviorSubject
class is almost like the ReplaySubjects
class with a buffer size of one. The BehaviorSubject
class can be used to implement a stateful reactive instance—a reactive property. Again, a source Observable
instance is not needed.AsyncSubject
: This emits the last value (and only that) emitted by the source Observable
instance, and only after the source Observable
instance completes. If the source Observable
instance does not emit any values, the AsyncSubject
instance also completes without emitting any values. This is something like a promise in RxJava's world. A source Observable
instance is not needed; the value, the error, or the OnCompleted
notification can be passed to it by invoking the on*
methods.Using subjects may seem a cool way to solve various problems, but you should avoid using them. Or, at least implement them and their behavior in a method that returns a result of type Observable
.
The danger with the Subject
instance is that they give access to the onNext()
, onError()
, and onCompleted()
methods, and your logic can get messy (they need to be called following the Rx contract, cited earlier in this chapter). They can be misused very easily.
Opt for using the ConnecatableObservable
instance (that is, via the publish()
method) over the Subject
, when you need to create a hot Observable from a cold one.
But let's look at one good use of a Subject
instance—the aforementioned reactive properties. Again, we are going to implement 'The Reactive Sum', but this time it will be quite different. Here is the class defining it:
public class ReactiveSum { // (1) private BehaviorSubject<Double> a = BehaviorSubject.create(0.0); private BehaviorSubject<Double> b = BehaviorSubject.create(0.0); private BehaviorSubject<Double> c = BehaviorSubject.create(0.0); public ReactiveSum() { // (2) Observable.combineLatest(a, b, (x, y) -> x + y).subscribe(c); } public double getA() { // (3) return a.getValue(); } public void setA(double a) { this.a.onNext(a); } public double getB() { return b.getValue(); } public void setB(double b) { this.b.onNext(b); } public double getC() { // (4) return c.getValue(); } public Observable<Double> obsC() { return c.asObservable(); } }
This class has three double properties: two settable properties, a
and b
, and their sum, c
. When a
or b
changes, c
is automatically updated to their sum. There is a special method that we can use to track the changes to c
. So how does it work?
ReactiveSum
is a normal Java class, defining three private fields of type BehaviorSubject<Double>
, representing the variables a
, b
, and c
, and with default values of zero.c
to depend on both a
and b
and to be equal to their sum, again, using combineLatest()
method.a
and b
have getters and setters. The getters return their current value—the last received value. The setters emit the passed value to their Subject
instance, making it the last one.c
is read-only, so it has only a getter, but it can be listened to. This can be done with the obsC()
method, which returns it as an Observable
instance. Remember, when you use subjects, to always encapsulate them in types or methods and return the observables to the outside world.This ReactiveSum
class can be used like this:
ReactiveSum sum = new ReactiveSum(); subscribePrint(sum.obsC(), "Sum"); sum.setA(5); sum.setB(4);
This will output the following:
Sum : 0.0 Sum : 5.0 Sum : 9.0
The first value is emitted on the subscribe
()
method (remember the BehaviorSubject
instances always emit their last value on subscribing), and the other two will automatically be emitted on setting a
or b
.
The source code for the preceding example can be viewed/downloaded at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/ReactiveSumV3.java.
Reactive properties can be used for implementing bindings and counters, so they are very useful for desktop or browser applications. But this example is far from any functional paradigm.
18.220.245.233