In order to consume from an infinite stream of data, let's review the approaches discussed in Chapter 6, Surveying the Landscape.
Futures and promises only offer a one-time asynchronous task and are not suitable in this case. Also, we need some kind of list into which a data producer can insert ticks and analytics data. A consumer of this data would then be able to asynchronously take from the list. This is what a queue provides conceptually.
The clojure.lang.PersistentQueue/EMPTY
option is nonblocking. So, if there's no data in a queue, the consumer would have to continuously ask for data until it arrives (known as
polling). This is not quite what we want. Ideally, the consumer should wait (or block) until the data is available for use.
Java's persistent queues, paired with threads (or other concurrency tool) would mean having to deal with extra memory resources that threads impose, and Java interoperability issues such as thread monitoring and so on. Reducers, simply offer parallelism. These would be useful later on as an optimization but they're not necessary in the first pass of our design.
Recall that for the concurrency model of Communicating Sequential Processes, Clojure has a third-party library called core.async
. Clojure's core.async
library offers queues and concurrency in the form of channels and go blocks. Channels can also be easily passed around, and consumers block until data becomes available. These are good options for us. We'll set up the remainder of the chapter by first adding the core.async
and component
libraries to our project:
(defproject edgar "0.1.0-SNAPSHOT" :dependencies [[org.clojure/clojure "1.7.0"] [clj-time "0.9.0"] [org.clojure/math.numeric-tower "0.0.4"] [org.apache.commons/commons-math3 "3.5"] [org.clojure/core.async "0.1.346.0-17112a-alpha"] [com.stuartsierra/component "0.2.3"]])
When done, the project.clj
file should look similar to the preceding code (leiningen
will already have added the entries: description
, :url
, and :license
to defproject
). You can now restart REPL environment.
datasource/generate-prices
function was used to generate raw price data. We can plug the result of this to our core/generate-timeseries
from Chapter 2, First Principles and a Useful Way to Think. Now, it's just a matter of taking the first 320 elements and then the remaining part of the list. In your REPL, begin by pulling the necessary namespaces and aliases. Then, perform the subsequent evaluations:(require '[edgar.core :as core] '[edgar.analytics :as analytics] '[edgar.datasource :as datasource] '[clojure.core.async :as async :refer [go go-loop chan close! <! >!]])) (def price-list (datasource/generate-prices)) (def time-series (core/generate-timeseries price-list)) (def prices (take 320 time-series)) (def remaining (drop 320 time-series))
four/simple-moving-average
function takes this list of 320 ticks and calculates their averages in a window of 20 ticks. This window will slide forward until the end of the list is reached. The price list and results from the analytics/simple-moving-average
function are then fed into the analytics/exponential-moving-average
and analytics/bollinger-band
functions to produce these results:(def sma (analytics analytics/simple-moving-average {} 20 prices)) (def ema (analytics analytics/exponential-moving-average {} 20 prices sma)) (def bol (analytics analytics/bollinger-band 20 prices sma))
write-data
function to simply write our data to named files:(write-data "ticks.edn" prices) (write-data "sma.edn" sma) (write-data "ema.edn" ema) (write-data "bol.edn" bol)
core.async
library. Creating a channel is simple. We'll use the >!
function to put data into this channel within the context of the go
block. This block is the concurrency mechanism that core.async
provides, which manages threads and concurrency in the background. After those four pieces of data are on the channel, we can pull them off with the <!
function, again within the context of the go
block. Try evaluating this in your REPL:(let [c (chan)] ;; 4. put generate prices into core.async channel (go (>! c {:ticks prices})) (go (>! c {:sma sma})) (go (>! c {:ema ema})) (go (>! c {:bol bol})) ;; 5. analytics reads data from channel (println (<!! (go (<! c)))) (println (<!! (go (<! c)))) (println (<!! (go (<! c)))) (println (<!! (go (<! c)))) (close! c))
3.133.158.32