Maintaining consistency with ensure

When we use the STM, we are trying to coordinate and maintain consistency between several values, all of which keep changing. However, we'll sometimes want to maintain consistency with those references that won't change and therefore won't be included in the transaction. We can signal that the STM should include these other references in the transaction by using the ensure function.

This helps simplify the data processing system by ensuring that the data structures stay synchronized and consistent. The ensure function allows us to have more control over what gets managed by the STM.

For this recipe, we'll use a slightly contrived example. We'll process a set of text files and compute the frequency of a term as well as the total number of words. We'll do this concurrently, and we'll be able to watch the results get updated as we progress.

For the set of text files, we'll use the Brown corpus. Constructed in the 1960s, this was one of the first digital collections of texts (or corpora) assembled for linguists to use to study language. At that time, its size (one million words) was huge. Today, similar corpora contain 100 million words or more.

Getting ready

We'll need to include the clojure.string library and have easy access to the File class:

(require '[clojure.string :as string])
(import '[java.io File])

We'll also need to download the Brown corpus. We can download it at http://www.nltk.org/nltk_data/. Actually, you can use any large collection of texts, but the Brown corpus has each word's part of speech listed in the file, so we'll need to parse it specially. If you use a different corpus, you can just change the tokenize-brown function, as explained in the next section, to work with your texts.

How to do it…

For this recipe, we'll go from preprocessing the data to performing the counts in parallel and looking at the results.

  1. Let's get a sequence of the files to process:
    (def input-files
      (filter #(.isFile %)
              (file-seq (File. "./data/brown"))))
  2. Now, we'll define some references: finished will indicate whether processing is done or not, total-docs and total-words will keep running totals, freqs will map the tokens to their frequencies as a whole, and running-report is an agent that contains the current state of the report for the term we're interested in:
    (def finished (ref false))
    (def total-docs (ref 0))
    (def total-words (ref 0))
    (def freqs (ref {}))
    (def running-report
        (agent {:term nil,
      :frequency 0,
      :ratio 0.0}))
  3. Let's write the tokenizer. The text in the Brown corpus files look like this:
    The/at Fulton/np-tl County/nn-tl Grand/jj-tl Jury/nn-tl said/vbd Friday/nr an/at investigation/nn of/in Atlanta's/np$ recent/jj primary/nn election/nn produced/vbd ``/`` no/at evidence/nn ''/'' that/cs any/dti irregularities/nns took/vbd place/nn ./.

    We're not interested in the parts of speech, so our tokenizer will remove them and covert each token to a lowercase keyword:

    (defn tokenize-brown [input-str]
      (->> (string/split input-str #"s+")
           (map #(first (string/split % #"/" 2)))
           (filter #(> (count %) 0))
           (map string/lower-case)
           (map keyword)))
  4. Now, let's write a utility function that increments the frequency map for a token:
    (defn accum-freq [m token]
      (assoc m token (inc (m token 0))))
  5. We'll use that function in compute-file, which does the primary processing for each file. It also uses send-off to safely queue the next task for this agent:
    (defn compute-file [fs]
      (dosync
        (if-let [[s & ss] (seq fs)]
          (let [tokens (tokenize-brown (slurp s))
                tc (count tokens)
                fq (reduce accum-freq {} tokens)]
            (commute total-docs inc)
            (commute total-words #(+ tc %))
            (commute freqs #(merge-with + % fq))
            (send-off *agent* compute-file)
            ss)
          (do (alter finished (constantly true))
              '()))))
  6. Another function will update the report in parallel:
    (defn compute-report [{term :term, :as report}]
      (dosync
        (when-not @finished
          (send *agent* compute-report))
        (let [term-freq (term (ensure freqs) 0)
              tc (ensure total-words)
              r (if (zero? tc)
                  nil
                  (float (/ term-freq tc)))]
          (assoc report
                 :frequency term-freq
                 :ratio r))))
  7. Finally, compute-frequencies gets the entire thing started:
    (defn compute-frequencies [inputs term]
      (let [a (agent inputs)]
        (send running-report #(assoc % :term term))
        (send running-report compute-report)
        (send-off a compute-file)))
  8. To use this, we just call compute-frequencies with the inputs and a term, and then we poll finished and running-report to see how processing is going:
    user=> (compute-frequencies input-files :committee)
    #<Agent@1830f455: (…)>
    user=> [@finished @running-report]
    [false {:frequency 79, :ratio 6.933839E-4, :term :committee}]
    user=> [@finished @running-report]
    [false {:frequency 105, :ratio 2.5916903E-4, :term :committee}]
    user=> [@finished @running-report]
    [false {:frequency 164, :ratio 1.845714E-4, :term :committee}]
    user=> [@finished @running-report]
    [true {:frequency 168, :ratio 1.4468178E-4, :term :committee}]

We can see from the ratio of the committee frequency to the total frequency that initially the word committee occurred relatively often (0.07 percent, which is approximately the frequency of other common words in the overall corpus). However, by the end of processing, its frequency had settled down to about 0.014 percent of the total number of words, which is closer to what we would expect.

How it works…

In this recipe, compute-frequencies triggers everything. It creates a new agent that processes the input files one-by-one and updates most of the references in the compute-file function.

The compute-report function handles the updating of the running report. It bases that report on the frequency map and the total words. However, it doesn't change either of the two. But to keep everything synchronized, it calls ensure on both. Otherwise, there's a chance that the count of total words comes from one set of documents and the term frequency from another set. This isn't likely given that only one agent is updating those values, but if we decided to have more than one agent processing the files, that would be a possibility. To generate a report for a new term without reading all of the files again, we can define this function:

(defn get-report [term]
  (send running-report #(assoc % :term term))
  (send running-report compute-report)
  (await running-report)
  @running-report)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.255.187