Chapter 2. A Look at Reactive Extensions

Reactive Extensions—or Rx—is a Reactive Programming library from Microsoft to build complex asynchronous programs. It models time-varying values and events as observable sequences and is implemented by extending the Observer design pattern.

Its first target platform was .NET, but Netflix has ported Rx to the JVM under the name RxJava. Microsoft also develops and maintains a port of Rx to JavaScript called RxJS, which is the tool we used to build the sine-wave application. The two ports work a treat for us since Clojure runs on the JVM and ClojureScript in JavaScript environments.

As we saw in Chapter 1, What is Reactive Programming?, Rx is inspired by Functional Reactive Programming but uses different terminology. In FRP, the two main abstractions are behaviors and events. Although the implementation details are different, observable sequences represent events. Rx also provides a behavior-like abstraction through another data type called BehaviorSubject.

In this chapter, we will:

  • Explore Rx's main abstraction: observables
  • Learn about the duality between iterators and observables
  • Create and manipulate observable sequences

The Observer pattern revisited

In Chapter 1, What is Reactive Programming?, we saw a brief overview of the Observer design pattern and a simple implementation of it in Clojure using watches. Here's how we did it:

(def numbers (atom []))

(defn adder [key ref old-state new-state]
  (print "Current sum is " (reduce + new-state)))

(add-watch numbers :adder adder) 

In the preceding example, our observable subject is the var, numbers. The observer is the adder watch. When the observable changes, it pushes its changes to the observer synchronously.

Now, contrast this to working with sequences:

(->> [1 2 3 4 5 6]
     (map inc)
     (filter even?)
     (reduce +))

This time around, the vector is the subject being observed and the functions processing it can be thought of as the observers. However, this works in a pull-based model. The vector doesn't push any elements down the sequence. Instead, map and friends ask the sequence for more elements. This is a synchronous operation.

Rx makes sequences—and more—behave like observables so that you can still map, filter, and compose them just as you would compose functions over normal sequences.

Observer – an Iterator's dual

Clojure's sequence operators such as map, filter, reduce, and so on support Java Iterables. As the name implies, an Iterable is an object that can be iterated over. At a low level, this is supported by retrieving an Iterator reference from such object. Java's Iterator interface looks like the following:

public interface Iterator<E> {
    boolean hasNext();
    E next();
    void remove();
}

When passed in an object that implements this interface, Clojure's sequence operators pull data from it by using the next method, while using the hasNext method to know when to stop.

Tip

The remove method is required to remove its last element from the underlying collection. This in-place mutation is clearly unsafe in a multithreaded environment. Whenever Clojure implements this interface for the purposes of interoperability, the remove method simply throws UnsupportedOperationException.

An observable, on the other hand, has observers subscribed to it. Observers have the following interface:

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

As we can see, an Observer implementing this interface will have its onNext method called with the next value available from whatever observable it's subscribed to. Hence, it being a push-based notification model.

This duality [4] becomes clearer if we look at both the interfaces side by side:

Iterator<E> {                       Observer<T> {
    boolean hasNext();                  void onCompleted();
    E next();                           void onError(Throwable e);
    void remove();                      void onNext(T t);
}                                       }

Observables provide the ability to have producers push items asynchronously to consumers. A few examples will help solidify our understanding.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.129.253