Rewriting the stock market application with core.async

By using an example we are familiar with, we are able to focus on the differences between all approaches discussed so far, without getting side tracked with new, specific domain rules.

Before we dive into the implementation, let's quickly do an overview of how our solution should work.

Just like in our previous implementations, we have a service from which we can query share prices. Where our approach differs, however, is a direct consequence of how core.async channels work.

On a given schedule, we would like to write the current price to a core.async channel. This might look like so:

Rewriting the stock market application with core.async

This process will continuously put prices in the out channel. We need to do two things with each price: display it and display the calculated sliding window. Since we like our functions decoupled, we will use two go blocks, one for each task:

Rewriting the stock market application with core.async

Hold on. There seems to be something off with our approach. Once we take a price from the output channel, it is not available any longer to be taken by other go blocks, so, instead of calculating the sliding window starting with 10, our function ends up getting the second value, 20. With this approach, we will end up with a sliding window that calculates a sliding window with roughly every other item, depending on how consistent the interleaving between the go blocks is.

Clearly, this is not what we want, but it helps us think about the problem a little more. The semantics of core.async prevent us from reading a value from a channel more than once. Most of the time, this behavior is just fine—especially if you think of them as queues. So how can we provide the same value to both functions?

To solve this problem, we will take advantage of another channel constructor provided by core.async called broadcast. As the name implies, broadcast returns a channel, which, when written to, writes its value into the channels passed to it as arguments. Effectively, this changes our high-level picture to something like the following:

Rewriting the stock market application with core.async

In summary, we will have a go loop writing prices to this broadcast channel, which will then forward its values to the two channels from which we will be operating: prices and the sliding window.

With the general idea in place, we are ready to dive into the code.

Implementing the application code

We already have a project depending on core.async that we created in the previous section, so we'll be working off that. Let's start by adding an extra dependency on seesaw to your project.clj file:

  :dependencies [[org.clojure/clojure "1.5.1"]
                 [org.clojure/core.async "0.1.278.0-76b25b-alpha"]
                 [seesaw "1.4.4"]]

Next, create a file called stock_market.clj in the src directory and add this namespace declaration:

(ns core-async-playground.stock-market
  (:require [clojure.core.async
             :refer [go chan <! >! timeout go-loop map>] :as async])
  (:require [clojure.core.async.lab :refer [broadcast]])
  (:use [seesaw.core]))

This might be a good point to restart your REPL if you haven't done so. Don't worry about any functions we haven't seen yet. We'll get a feel for them in this section.

The GUI code remains largely unchanged, so no explanation should be necessary for the next snippet:

(native!)

(def main-frame (frame :title "Stock price monitor"
                       :width 200 :height 100
                       :on-close :exit))

(def price-label       (label "Price: -"))
(def running-avg-label (label "Running average: -"))

(config! main-frame :content
         (border-panel
          :north  price-label
          :center running-avg-label
          :border 5))

(defn share-price [company-code]
  (Thread/sleep 200)
  (rand-int 1000))

(defn avg [numbers]
  (float (/ (reduce + numbers)
            (count numbers))))

(defn roll-buffer [buffer val buffer-size]
  (let [buffer (conj buffer val)]
    (if (> (count buffer) buffer-size)
      (pop buffer)
      buffer)))

(defn make-sliding-buffer [buffer-size]
  (let [buffer (atom clojure.lang.PersistentQueue/EMPTY)]
    (fn [n]
      (swap! buffer roll-buffer n buffer-size))))

(def sliding-buffer (make-sliding-buffer 5))

The only difference is that now we have a sliding-buffer function that returns a window of data. This is in contrast with our original application, where the rolling-avg function was responsible for both creating the window and calculating the average. This new design is more general as it makes this function easier to reuse. The sliding logic is the same, however.

Next, we have our main application logic using core.async:

(defn broadcast-at-interval [msecs task & ports]
  (go-loop [out (apply broadcast ports)]
    (<! (timeout msecs))
    (>! out (task))
    (recur out)))

(defn -main [& args]
  (show! main-frame)
  (let [prices-ch         (chan)
        sliding-buffer-ch (map> sliding-buffer (chan))]
    (broadcast-at-interval 500 #(share-price "XYZ") prices-ch sliding-buffer-ch)
    (go-loop []
      (when-let [price (<! prices-ch)]
        (text! price-label (str "Price: " price))
        (recur)))
    (go-loop []
      (when-let [buffer (<! sliding-buffer-ch)]
        (text! running-avg-label (str "Running average: " (avg buffer)))
        (recur)))))

Let's walk through the code.

The first function, broadcast-at-interval, is responsible for creating the broadcasting channel. It receives a variable number of arguments: a number of milliseconds describing the interval, the function representing the task to be executed, and a sequence of one of more output channels. These channels are used to create the broadcasting channel to which the go loop will be writing prices.

Next, we have our main function. The let block is where the interesting bits are. As we discussed in our high-level diagrams, we need two output channels: one for prices and one for the sliding window. They are both created in the following:

...
  (let [prices-ch         (chan)
        sliding-buffer-ch (map> sliding-buffer (chan))]
...

prices-ch should be self-explanatory; however, sliding-buffer-ch is using a function we haven't encountered before: map>. This is yet another useful channel constructor in core.async. It takes two arguments: a function and a target channel. It returns a channel that applies this function to each value before writing it to the target channel. An example will help illustrate how it works:

(def c (map> sliding-buffer (chan 10)))
(go (doseq [n (range 10)]
      (>! c n)))
(go (doseq [n (range 10)]
      (prn  (vec (<! c)))))

;; [0]
;; [0 1]
;; [0 1 2]
;; [0 1 2 3]
;; [0 1 2 3 4]
;; [1 2 3 4 5]
;; [2 3 4 5 6]
;; [3 4 5 6 7]
;; [4 5 6 7 8]
;; [5 6 7 8 9]

That is, we write a price to the channel and get a sliding window on the other end. Finally, we create the two go blocks containing the side effects. They loop indefinitely, getting values from both channels and updating the user interface.

You can see it in action by running the program from the terminal:

$ lein run -m core-async-playground.stock-market
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.173.199