Using a componentized architecture

Now that we've laid out the preceding steps, we'll put the functionalities into a more cohesive package. A queue of data is runtime state that has to be managed. Recall the component architecture that's widely used in the Clojure community. This architecture is the result of an academic paper called Out Of The Tarpit in which the authors have argued that complexity is the biggest nemesis to working software (you can read more at http://shaffner.us/cs/papers/tarpit.pdf). Any incidental complexity should be avoided or removed, and things, such as runtime state, should be cordoned off to a small managed component of the program. This is important because it reflects a componentized architecture that many banks use. It allows for an incremental approach to building out and architecting core banking systems. Many industry participants adopt what's known as a Service-Oriented Architecture (SOA) as part of a suite of best practices. There's a core banking platform or environment that puts basic core banking capabilities in place quickly. Then, gradually, a bank can add components sourced from many different vendors. This is done in a way that ensures open standards and keeps the bank flexible enough to make future investments. The benefits of this approach are such that the bank gets to choose the best tools for its needs. These are tools and technologies that let management ask key questions pertaining to assets, liability, liquidity, risk, and so on, in a timely manner (you can read more at https://en.wikipedia.org/wiki/Core_banking and http://www-05.ibm.com/cz/businesstalks/pdf/Core_Banking_Modernization_Point_of_View.PDF).

Similarly, a component isolates and manages the life cycles of units of code that have a runtime state. So, we can be confident in using this approach. So far, we've entered code into an REPL. Now, you can create a file called src/edgar/component.clj. At the top of our namespace declaration, we're going to require the same libraries as those in the preceding section. However, we're also going to add a component library and some date formatting libraries:

(ns edgar.component
  (:require [clojure.java.io :as io]
            [clojure.core.async :as async :refer [go go-loop chan close! <! >!]]
            [com.stuartsierra.component :as component]
            [clj-time.format :as fmt]
            [edgar.core :as core]
            [edgar.analytics :as analytics]
            [edgar.datasource :as datasource])
  (:import [java.text SimpleDateFormat]))

Now we can begin to write our code. After reviewing component's documentation, we can put up an initial scaffolding with components for the infinite stream of time series data, and consumers of this data:

(defrecord Timeseries []
  component/Lifecycle

  (start [component]
    (let [c (chan)]
      (assoc component :channel c)))

  (stop [component]
    (let [c (:channel component)]
      (close! c)
      (dissoc component :channel))))

(defrecord Consumer [timeseries]
  component/Lifecycle

  (start [component]
    (assoc component :channel (:channel timeseries)))

  (stop [component]
    (dissoc component :channel)))

(defn new-timeseries []
  (map->Timeseries {}))

(defn new-consumer []
  (map->Consumer {}))


(defn build-system []
  (component/system-map
   :tms (new-timeseries)
   :cns (component/using
              (new-consumer)
              {:timeseries :tms})))

(def system (build-system))

When we're done, we will be able to start our system with the component/start function. Often, when we def a symbol, it indicates a simple scalar value or collection. However, when we called component/system-map (within build-system), it returned a Clojure var (one of Clojure's container values that allows controlled updates). So, to change the system var, we'll use alter-var-root to atomically assign a new value, thereby avoiding concurrent updates. Thus, in a scenario where def would redefine an existing var, alter-var-root creates a new value, which is a succession of the previous value. You can now inspect the system map and see the core.async channel that's shared between the time series and consumer components. The clojure.pprint/pprint call neatly prints and formats structured data:

> (alter-var-root #'system component/start)
> (clojure.pprint/pprint system)
{:tms
 {:channel
  #object[clojure.core.async.impl.channels.ManyToManyChannel 0x14d1e55 "clojure.core.async.impl.channels.ManyToManyChannel@14d1e55"]},
 :cns
 {:timeseries
  {:channel
   #object[clojure.core.async.impl.channels.ManyToManyChannel 0x14d1e55 "clojure.core.async.impl.channels.ManyToManyChannel@14d1e55"]},
  :channel
  #object[clojure.core.async.impl.channels.ManyToManyChannel 0x14d1e55 "clojure.core.async.impl.channels.ManyToManyChannel@14d1e55"]}}

We can now add the send-data! and receive-data functions that are used by the time-series and consumer components, respectively:

(defn send-data! [channel time-series]

  (go-loop [prices (take 320 time-series)
            remaining (drop 320 time-series)]

    (let [sma (analytics/simple-moving-average {} 20 prices)
          ema (analytics/exponential-moving-average {} 20 prices sma)
          bol (analytics/bollinger-band 20 prices sma)]

      (>! channel {:ticks prices :sma sma :ema ema :bol bol})
      (Thread/sleep 1000))

    (recur (take 320 remaining)
           (drop 320 remaining))))

(defn receive-data! [channel]

  (go-loop [data (<! channel)]
            (println data)
            (if-not (nil? data)
              (recur (<! channel)))))

(defrecord Timeseries []
  component/Lifecycle

  (start [component]
    (let [c (chan)
          price-list (datasource/generate-prices)
          time-series (core/generate-timeseries price-list)]

      (send-data! c time-series)
      (assoc component :channel c)))

  (stop [component]
    (let [c (:channel component)]
      (close! c)
      (dissoc component :channel))))

(defrecord Consumer [timeseries]
  component/Lifecycle

  (start [component]
    (let [channel (:channel timeseries)]

      (receive-data! channel)
      (assoc component :channel channel)))

  (stop [component]
    (dissoc component :channel)))

The receive-data! expression just prints out what it's consumed. Once we see this working successfully, we can improve receive-data! to write the data out in the manner we laid out in the Devising a persistence strategy section:

(defn receive-data! [channel]

  (go-loop [data (<! channel)]

    (if-not (nil? data)
      (do
        (let [{ticks :ticks sma :sma ema :ema bol :bol} data
              timestamp (-> ticks last :last-trade-time .toString)
              generate-file-name-with-timestamp-fn (fn [fname] (str timestamp "-" fname))]

          (write-data (generate-file-name-with-timestamp-fn "ticks.edn") ticks)
          (write-data (generate-file-name-with-timestamp-fn "sma.edn") sma)
          (write-data (generate-file-name-with-timestamp-fn "ema.edn") ema)
          (write-data (generate-file-name-with-timestamp-fn "bol.edn") bol))
        (recur (<! channel))))))

Run your system for a few seconds by calling (alter-var-root #'system component/start). When you think you've collected enough data, stop your system with (alter-var-root #'system component/stop). When you look at your data/ directory, you should see output files that look something like this:

$ ls data/
.                                   2015-08-09T23:10:43.979Z-ticks.edn  2015-08-09T23:34:30.979Z-bol.edn    2015-08-09T23:49:21.979Z-ema.edn    2015-08-10T00:05:30.979Z-sma.edn    2015-08-10T00:22:00.979Z-ticks.edn  2015-08-10T00:45:34.979Z-bol.edn
..                                  2015-08-09T23:18:25.979Z-bol.edn    2015-08-09T23:34:30.979Z-ema.edn    2015-08-09T23:49:21.979Z-sma.edn    2015-08-10T00:05:30.979Z-ticks.edn  2015-08-10T00:29:57.979Z-bol.edn    2015-08-10T00:45:34.979Z-ema.edn
2015-08-09T23:02:58.979Z-bol.edn    2015-08-09T23:18:25.979Z-ema.edn    2015-08-09T23:34:30.979Z-sma.edn    2015-08-09T23:49:21.979Z-ticks.edn  2015-08-10T00:13:46.979Z-bol.edn    2015-08-10T00:29:57.979Z-ema.edn    2015-08-10T00:45:34.979Z-sma.edn
2015-08-09T23:02:58.979Z-ema.edn    2015-08-09T23:18:25.979Z-sma.edn    2015-08-09T23:34:30.979Z-ticks.edn  2015-08-09T23:57:20.979Z-bol.edn    2015-08-10T00:13:46.979Z-ema.edn    2015-08-10T00:29:57.979Z-sma.edn    2015-08-10T00:45:34.979Z-ticks.edn
2015-08-09T23:02:58.979Z-sma.edn    2015-08-09T23:18:25.979Z-ticks.edn  2015-08-09T23:41:43.979Z-bol.edn    2015-08-09T23:57:20.979Z-ema.edn    2015-08-10T00:13:46.979Z-sma.edn    2015-08-10T00:29:57.979Z-ticks.edn
2015-08-09T23:02:58.979Z-ticks.edn  2015-08-09T23:26:10.979Z-bol.edn    2015-08-09T23:41:43.979Z-ema.edn    2015-08-09T23:57:20.979Z-sma.edn    2015-08-10T00:13:46.979Z-ticks.edn  2015-08-10T00:37:28.979Z-bol.edn
2015-08-09T23:10:43.979Z-bol.edn    2015-08-09T23:26:10.979Z-ema.edn    2015-08-09T23:41:43.979Z-sma.edn    2015-08-09T23:57:20.979Z-ticks.edn  2015-08-10T00:22:00.979Z-bol.edn    2015-08-10T00:37:28.979Z-ema.edn
2015-08-09T23:10:43.979Z-ema.edn    2015-08-09T23:26:10.979Z-sma.edn    2015-08-09T23:41:43.979Z-ticks.edn  2015-08-10T00:05:30.979Z-bol.edn    2015-08-10T00:22:00.979Z-ema.edn    2015-08-10T00:37:28.979Z-sma.edn
2015-08-09T23:10:43.979Z-sma.edn    2015-08-09T23:26:10.979Z-ticks.edn  2015-08-09T23:49:21.979Z-bol.edn    2015-08-10T00:05:30.979Z-ema.edn    2015-08-10T00:22:00.979Z-sma.edn    2015-08-10T00:37:28.979Z-ticks.edn
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.221.133