Monitoring processing with watchers

Another tool that Clojure provides for working with agents is watchers. These are just functions that get a chance to peek at the agent's data. This happens after the validators have successfully run and the new data is set as the agent's state. Because of the way it's handled, the state may have changed again since then, but watchers give you the chance to look at the data and track it separately.

This can help us keep an eye on the data as it's being processed. We can use it to log progress, sample the data for manual validation, or a number of other tasks.

Getting ready

We'll need these dependencies:

(require '[clojure.java.io :as io]
         '[clojure.data.csv :as csv]
         '[clojure.string :as str])
(import '[java.lang Thread])

Also, we'll use the data files from the Managing program complexity with agents recipe, along with the binding to the list of those files, data-files.

From Managing program complexity with STM, we'll use the lazy-read-csv and ->int functions.

How to do it…

In this recipe, we'll add a watcher to keep track of the number of rows that are converted, and we'll add a flag that lets us know when processing is finished:

  1. In order to convert the appropriate fields into rows, we'll need a couple of functions. First, we'll have, row-ints, a data binding to indicate which fields need to be converted to integers. Then, we'll define try->int, which will normalize the input and attempt to convert it to an integer. If it fails, it will return the original value. Finally, coerce-row-ints will take a row, attempt to convert the rows with integers, and send the results to the next agent:
    (def row-ints [0 4])
    (defn try->int [v]
      (try
        (->int (str/trim (str/replace v | space)))
        (catch Exception ex
          v)))
    (defn coerce-row-ints [_ row indices sink]
      (let [cast-row
            (->> indices
                (mapcat #(vector
                            % (try->int (nth row %))))
                (apply assoc row))]
        (send sink conj cast-row)
        cast-row))
  2. The agent that reads the data will use the function read-row. This is mostly similar to the read-row that we saw in the Maintaining data consistency with validators recipe. The differences are highlighted here:
    (defn read-row [rows caster sink done]
      (if-let [[item & items] (seq rows)]
        (do
          (send caster coerce-row-ints
                item row-ints sink)
          (send *agent* read-row caster sink done)
          items)
        (do
          (dosync (commute done (constantly true)))
          '())))
  3. The function that watches the agent that coerces the data will just update a counter:
    (defn watch-caster
      [counter watch-key watch-agent old-state new-state]
      (when-not (nil? new-state)
        (dosync (commute counter inc))))
  4. We'll define a function that polls until processing is finished:
    (defn wait-for-it [sleep-for ref-var]
      (loop []
        (when-not @ref-var
          (Thread/sleep sleep-for)
          (recur))))
  5. The last function creates all of the agents and references and dispatches their functions. Finally, it blocks until they are all finished, when it returns the results. Again, I've highlighted the differences from agent-ints in the Maintaining Data consistency with validators recipe, which is very similar:
    (defn watch-processing [input-files]
      (let [reader (agent (seque
                            (mapcat
                              lazy-read-csv
                              input-files)))
            caster (agent nil)
            sink (agent [])
            counter (ref 0)
            done (ref false)]
        (add-watch caster :counter
                   (partial watch-caster counter))
        (send reader read-row caster sink done)
        (wait-for-it 250 done)
        {:results @sink
         :count-watcher @counter}))
  6. When we run the preceding code, we get the data about counts from the watcher:
    user=> (:count-watcher (watch-processing (take 2 data-files)))
    118095

How it works…

This time, instead of associating a validator with the agent that coerces the integers, we called add-watch on it. Each time the agent is updated, the watch function is called with four parameters: a key, the agent, the old state, and the new state. Our watch function first needs a counter reference, which we will supply by partially applying its parameters when we call add-watch.

How it works…

Once everything has been created, watch-processing just sends the input agent the first message, and then it waits for the processing to finish.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.186.143