Removing incidental complexity with RxClojure

In Chapter 2, A Look at Reactive Extensions, we learned about the basic building blocks of RxClojure, an open-source CES framework. In this section, we'll use this knowledge in order to remove the incidental complexity from our program. This will give us a clear, declarative way to display both prices and rolling averages.

The UI code we've written so far remains unchanged, but we need to make sure RxClojure is declared in the dependencies section of our project.clj file:

[io.reactivex/rxclojure "1.0.0"]

Then, ensure we require the following library:

(ns stock-market-monitor.core
  (:require [rx.lang.clojure.core :as rx]
            [seesaw.core :refer :all])
  (:import (java.util.concurrent TimeUnit)
           (rx Observable)))

The way we approach the problem this time is also different. Let's take a look at the first requirement: it requires we display the current price of a company's share in the stock market.

Every time we query the price service, we get a—possibly different—price for the company in question. As we saw in Chapter 2, A Look at Reactive Extensions, modeling this as an observable sequence is easy, so we'll start with that. We'll create a function that gives us back a stock price observable for the given company:

(defn make-price-obs [company-code]
  (rx/return (share-price company-code)))

This is an observable that yields a single value and terminates. It's equivalent to the following marble diagram:

Removing incidental complexity with RxClojure

Part of the first requirement is that we query the service on a predefined time interval—every 500 milliseconds in this case. This hints at an observable we have encountered before, aptly named interval. In order to get the polling behavior we want, we need to combine the interval and the price observables.

As you probably recall, flatmap is the tool for the job here:

(rx/flatmap (fn [_] (make-price-obs "XYZ"))
                    (Observable/interval 500 
                                         TimeUnit/MILLISECONDS))

The preceding snippet creates an observable that will yield the latest stock price for XYZ every 500 milliseconds indefinitely. It corresponds to the following diagram:

Removing incidental complexity with RxClojure

In fact, we can simply subscribe to this new observable and test it out. Modify your main function to the following snippet and run the program:

(defn -main [& args]
  (show! main-frame)
  (let [price-obs (rx/flatmap (fn [_] (make-price-obs "XYZ"))
                              (Observable/interval 500 TimeUnit/MILLISECONDS))]
    (rx/subscribe price-obs
                  (fn [price]
                    (text! price-label (str "Price: " price))))))

This is very cool! We replicated the behavior of our first program with only a few lines of code. The best part is that we did not have to worry about thread pools or scheduling actions. By thinking about the problem in terms of observable sequences, as well as combining existing and new observables, we were able to declaratively express what we want the program to do.

This already provides great benefits in maintainability and readability. However, we are still missing the other half of our program: rolling averages.

Observable rolling averages

It might not be immediately obvious how we can model rolling averages as observables. What we need to keep in mind is that pretty much anything we can think of as a sequence of values, we can probably model as an observable sequence.

Rolling averages are no different. Let's forget for a moment that the prices are coming from a network call wrapped in an observable. Let's imagine we have all values we care about in a Clojure vector:

(def values (range 10))

What we need is a way to process these values in partitions—or buffers—of size 5 in such a way that only a single value is dropped at each interaction. In Clojure, we can use the partition function for this purpose:

(doseq [buffer (partition 5 1 values)]
  (prn buffer))

(0 1 2 3 4)
(1 2 3 4 5)
(2 3 4 5 6)
(3 4 5 6 7)
(4 5 6 7 8)
...

The second argument to the partition function is called a step and it is the offset of how many items should be skipped before starting a new partition. Here, we set it to 1 in order to create the sliding window effect we need.

The big question then is: can we somehow leverage partition when working with observable sequences?

It turns out that RxJava has a transformer called buffer just for this purpose. The previous example can be rewritten as follows:

(-> (rx/seq->o (vec (range 10)))
    (.buffer 5 1)
    (rx/subscribe
     (fn [price]
       (prn (str "Value: " price)))))

Tip

As mentioned previously, not all RxJava's API is exposed through RxClojure, so here we need to use interop to access the buffer method from the observable sequence.

As before, the second argument to buffer is the offset, but it's called skip in the RxJava documentation. If you run this at the REPL you'll see the following output:

"Value: [0, 1, 2, 3, 4]"
"Value: [1, 2, 3, 4, 5]"
"Value: [2, 3, 4, 5, 6]"
"Value: [3, 4, 5, 6, 7]"
"Value: [4, 5, 6, 7, 8]"
...

This is exactly what we want. The only difference is that the buffer method waits until it has enough elements—five in this case—before proceeding.

Now, we can go back to our program and incorporate this idea with our main function. Here is what it looks like:

(defn -main [& args]
  (show! main-frame)
  (let [price-obs (-> (rx/flatmap make-price-obs
                                  (Observable/interval 500 TimeUnit/MILLISECONDS))
                      (.publish))
        sliding-buffer-obs (.buffer price-obs 5 1)]
    (rx/subscribe price-obs
                  (fn [price]
                    (text! price-label (str "Price: " price))))
    (rx/subscribe sliding-buffer-obs
                  (fn [buffer]
                    (text! running-avg-label (str "Running average: " (avg buffer)))))
    (.connect price-obs)))

The preceding snippet works by creating two observables. The first one, price-obs, we had created before. The new sliding buffer observable is created using the buffer transformer on price-obs.

We can, then, independently subscribe to each one in order to update the price and rolling average labels. Running the program will display the same screen we've seen previously:

Observable rolling averages

You might have noticed two method calls we hadn't seen before: publish and connect.

The publish method returns a connectable observable. This means that the observable won't start emitting values until its connect method has been called. We do this here because we want to make sure that all the subscribers receive all the values emitted by the original observable.

In conclusion, without much additional code, we implemented all requirements in a concise, declarative manner that is easy to maintain and follow. We have also made the previous roll-buffer function completely unnecessary.

The full source code for the CES version of the program is given here for reference:

(ns stock-market-monitor.05frp-price-monitor-rolling-avg
  (:require [rx.lang.clojure.core :as rx]
            [seesaw.core :refer :all])
  (:import (java.util.concurrent TimeUnit)
           (rx Observable)))

(native!)

(def main-frame (frame :title "Stock price monitor"
                       :width 200 :height 100
                       :on-close :exit))

(def price-label       (label "Price: -"))
(def running-avg-label (label "Running average: -"))

(config! main-frame :content
         (border-panel
          :north  price-label
          :center running-avg-label
          :border 5))

(defn share-price [company-code]
  (Thread/sleep 200)
  (rand-int 1000))

(defn avg [numbers]
  (float (/ (reduce + numbers)
            (count numbers))))

(defn make-price-obs [_]
  (rx/return (share-price "XYZ")))

(defn -main [& args]
  (show! main-frame)
  (let [price-obs (-> (rx/flatmap make-price-obs
                                  (Observable/interval 500 TimeUnit/MILLISECONDS))
                      (.publish))
        sliding-buffer-obs (.buffer price-obs 5 1)]
    (rx/subscribe price-obs
                  (fn [price]
                    (text! price-label (str "Price: " price))))
    (rx/subscribe sliding-buffer-obs
                  (fn [buffer]
                    (text! running-avg-label (str "Running average: " (avg buffer)))))
    (.connect price-obs)))

Note how in this version of the program, we didn't have to use a shutdown hook. This is because RxClojure creates daemon threads, which are automatically terminated once the application exits.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.174.23