Consuming from a data stream

In order to consume from an infinite stream of data, let's review the approaches discussed in Chapter 6, Surveying the Landscape.

Futures and promises only offer a one-time asynchronous task and are not suitable in this case. Also, we need some kind of list into which a data producer can insert ticks and analytics data. A consumer of this data would then be able to asynchronously take from the list. This is what a queue provides conceptually.

The clojure.lang.PersistentQueue/EMPTY option is nonblocking. So, if there's no data in a queue, the consumer would have to continuously ask for data until it arrives (known as polling). This is not quite what we want. Ideally, the consumer should wait (or block) until the data is available for use.

Java's persistent queues, paired with threads (or other concurrency tool) would mean having to deal with extra memory resources that threads impose, and Java interoperability issues such as thread monitoring and so on. Reducers, simply offer parallelism. These would be useful later on as an optimization but they're not necessary in the first pass of our design.

Recall that for the concurrency model of Communicating Sequential Processes, Clojure has a third-party library called core.async. Clojure's core.async library offers queues and concurrency in the form of channels and go blocks. Channels can also be easily passed around, and consumers block until data becomes available. These are good options for us. We'll set up the remainder of the chapter by first adding the core.async and component libraries to our project:

(defproject edgar "0.1.0-SNAPSHOT"
:dependencies [[org.clojure/clojure "1.7.0"]
                 [clj-time "0.9.0"]
                 [org.clojure/math.numeric-tower "0.0.4"]
                 [org.apache.commons/commons-math3 "3.5"]
                 [org.clojure/core.async "0.1.346.0-17112a-alpha"]
                 [com.stuartsierra/component "0.2.3"]])

When done, the project.clj file should look similar to the preceding code (leiningen will already have added the entries: description, :url, and :license to defproject). You can now restart REPL environment.

  1. The first order of business is to create an infinite data stream and pull tick data from it in chunks of 320. This is possible with the functionality we've already created. Recall that in Chapter 5, Traversing Data, Branching, and Conditional Dispatch, our refactored datasource/generate-prices function was used to generate raw price data. We can plug the result of this to our core/generate-timeseries from Chapter 2, First Principles and a Useful Way to Think. Now, it's just a matter of taking the first 320 elements and then the remaining part of the list. In your REPL, begin by pulling the necessary namespaces and aliases. Then, perform the subsequent evaluations:
       (require '[edgar.core :as core]
             '[edgar.analytics :as analytics]
             '[edgar.datasource :as datasource]
             '[clojure.core.async :as async :refer [go go-loop chan close! <! >!]]))
    
    (def price-list (datasource/generate-prices))
    (def time-series (core/generate-timeseries price-list))
    
    (def prices (take 320 time-series))
    (def remaining (drop 320 time-series))
  2. With our price data, we can call our analytic functions. All the functions are available in Chapter 4, Strategies for Calculating and Manipulating Data; the four/simple-moving-average function takes this list of 320 ticks and calculates their averages in a window of 20 ticks. This window will slide forward until the end of the list is reached. The price list and results from the analytics/simple-moving-average function are then fed into the analytics/exponential-moving-average and analytics/bollinger-band functions to produce these results:
    (def sma (analytics analytics/simple-moving-average {} 20 prices))
    (def ema (analytics analytics/exponential-moving-average {} 20 prices sma))
    (def bol (analytics analytics/bollinger-band 20 prices sma))
  3. It's simple for us to now take all the data we've calculated and save it in a file. We can use the write-data function to simply write our data to named files:
    (write-data "ticks.edn" prices)
    (write-data "sma.edn" sma)
    (write-data "ema.edn" ema)
    (write-data "bol.edn" bol)
  4. Now that we know that there will be a producer and consumer of data, let's use the pulled in core.async library. Creating a channel is simple. We'll use the >! function to put data into this channel within the context of the go block. This block is the concurrency mechanism that core.async provides, which manages threads and concurrency in the background. After those four pieces of data are on the channel, we can pull them off with the <! function, again within the context of the go block. Try evaluating this in your REPL:
    (let [c (chan)]
    
      ;; 4. put generate prices into core.async channel
      (go (>! c {:ticks prices}))
      (go (>! c {:sma sma}))
      (go (>! c {:ema ema}))
      (go (>! c {:bol bol}))
    
      ;; 5. analytics reads data from channel
      (println (<!! (go (<! c))))
      (println (<!! (go (<! c))))
      (println (<!! (go (<! c))))
      (println (<!! (go (<! c))))
    
      (close! c))
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.158.32