The Subject instances

The Subject instances are both Observable instances and Observer instances. Like Observable instances, they can have multiple Observer instances, receiving the same notifications. That's why they can be used to turn cold Observable instances into hot ones. Like Observer instances, they give us access to their onNext(), onError(), or onCompleted() methods.

Let's look at an implementation of the preceding hot interval examples, using a Subject instance:

Observable<Long> interval = Observable.interval(100L, TimeUnit.MILLISECONDS); // (1)
Subject<Long, Long> publishSubject = PublishSubject.create(); // (2)
interval.subscribe(publishSubject);
// (3)
Subscription sub1 = subscribePrint(publishSubject, "First");
Subscription sub2 = subscribePrint(publishSubject, "Second");
Subscription sub3 = null;
try {
  Thread.sleep(300L);
  publishSubject.onNext(555L); // (4)
  sub3 = subscribePrint(publishSubject, "Third"); // (5)
  Thread.sleep(500L);
}
catch (InterruptedException e) {}
sub1.unsubscribe(); // (6)
sub2.unsubscribe();
sub3.unsubscribe();

The example is slightly different now:

  1. The interval Observable instance is created the same way as before.
  2. Here, we create a PublishSubject instance—a Subject instance that emits to an Observer instance only those items that are emitted by the source Observable instance subsequent to the time of the subscription. This behavior is similar to that of the ConnectableObservable instance created by the publish() method. The new Subject instance is subscribed to the interval Observable instance , created by the interval factory method, which is possible because the Subject class implements the Observer interface. Also, note that the Subject signature has two generic types—one for the type of notifications the Subject instance will receive and another for the type of the notifications it will emit. The PublishSubject class has the same type for its input and output notifications.

    Note that it is possible to create a PublishSubject instance without subscribing to a source Observable instance. It will emit only the notifications passed to its onNext() and onError() methods and will complete when calling its onCompleted() method.

  3. We can subscribe to the Subject instance; it is an Observable instance after all.
  4. We can emit a custom notification at any time. It will be broadcast to all the subscribers of the subject. We can even call the onCompleted() method and close the notification stream.
  5. The third Subscriber will only receive notifications emitted after it subscribes.
  6. When everything unsubscribes, the Subject instance will continue emitting.

There are four types of subjects that come with RxJava:

  • PublishSubject: This is the one we saw in the previous example, behaving like ConnectableObservable, created using the publish() method.
  • ReplaySubject: This emits to any observer all of the items that were emitted by the source Observable instance, regardless of when the observer subscribes. So, it behaves like ConnectableObservable, created using the replay() method. The ReplaySubject class has many factory methods. The default one caches everything; keep this in mind, because it can eat up memory. There are factory methods for creating it with size-bound and/or time-bound buffers. As with the PublishSubject class, this one can be used without a source Observable instance. All of the notifications emitted using its onNext(), onError(), and onCompleted() methods will be emitted to every Subscriber, even if it is subscribed after invoking the on* methods.
  • BehaviorSubject: When an observer subscribes to it, it emits the item most recently emitted by the source Observable instance (or a seed/default value if none have yet been emitted) and then continues to emit any other items emitted later by the source Observable instance. The BehaviorSubject class is almost like the ReplaySubjects class with a buffer size of one. The BehaviorSubject class can be used to implement a stateful reactive instance—a reactive property. Again, a source Observable instance is not needed.
  • AsyncSubject: This emits the last value (and only that) emitted by the source Observable instance, and only after the source Observable instance completes. If the source Observable instance does not emit any values, the AsyncSubject instance also completes without emitting any values. This is something like a promise in RxJava's world. A source Observable instance is not needed; the value, the error, or the OnCompleted notification can be passed to it by invoking the on* methods.

Using subjects may seem a cool way to solve various problems, but you should avoid using them. Or, at least implement them and their behavior in a method that returns a result of type Observable.

The danger with the Subject instance is that they give access to the onNext(), onError(), and onCompleted() methods, and your logic can get messy (they need to be called following the Rx contract, cited earlier in this chapter). They can be misused very easily.

Opt for using the ConnecatableObservable instance (that is, via the publish() method) over the Subject, when you need to create a hot Observable from a cold one.

But let's look at one good use of a Subject instance—the aforementioned reactive properties. Again, we are going to implement 'The Reactive Sum', but this time it will be quite different. Here is the class defining it:

public class ReactiveSum { // (1)
  private BehaviorSubject<Double> a = BehaviorSubject.create(0.0);
  private BehaviorSubject<Double> b = BehaviorSubject.create(0.0);
  private BehaviorSubject<Double> c = BehaviorSubject.create(0.0);
  public ReactiveSum() { // (2)
    Observable.combineLatest(a, b, (x, y) -> x + y).subscribe(c);
  }
  public double getA() { // (3)
    return a.getValue();
  }
  public void setA(double a) {
    this.a.onNext(a);
  }
  public double getB() {
    return b.getValue();
  }
  public void setB(double b) {
    this.b.onNext(b);
  }
  public double getC() { // (4)
    return c.getValue();
  }
  public Observable<Double> obsC() {
    return c.asObservable();
  }
}

This class has three double properties: two settable properties, a and b, and their sum, c. When a or b changes, c is automatically updated to their sum. There is a special method that we can use to track the changes to c. So how does it work?

  1. ReactiveSum is a normal Java class, defining three private fields of type BehaviorSubject<Double>, representing the variables a, b, and c, and with default values of zero.
  2. In the constructor, we subscribe c to depend on both a and b and to be equal to their sum, again, using combineLatest() method.
  3. The properties a and b have getters and setters. The getters return their current value—the last received value. The setters emit the passed value to their Subject instance, making it the last one.

    Note

    The getValue() method of the BehaviorSubject parameter is used for retrieving it. It is available at RxJava 1.0.5.

  4. The property c is read-only, so it has only a getter, but it can be listened to. This can be done with the obsC() method, which returns it as an Observable instance. Remember, when you use subjects, to always encapsulate them in types or methods and return the observables to the outside world.

This ReactiveSum class can be used like this:

ReactiveSum sum = new ReactiveSum();
subscribePrint(sum.obsC(), "Sum");
sum.setA(5);
sum.setB(4);

This will output the following:

Sum : 0.0
Sum : 5.0
Sum : 9.0

The first value is emitted on the subscribe () method (remember the BehaviorSubject instances always emit their last value on subscribing), and the other two will automatically be emitted on setting a or b.

Reactive properties can be used for implementing bindings and counters, so they are very useful for desktop or browser applications. But this example is far from any functional paradigm.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.14.251.128