Managing program complexity with agents

Agents build on the STM, and each agent acts a lot like a reference. References allow you to coordinate multiple pieces of the state, but if you only have one piece of the state that you're updating, then that's a good use for agents. You use agents by sending them messages (functions that manipulate the agent's state) and these are run in the thread pool, although each agent only processes one task at a time.

We create agents with the agent function, and we send messages to them with send and send-off. Whatever the function returns is the agent's new state value. This figure illustrates this process:

Managing program complexity with agents

For this recipe, we'll again solve the same problem we did in the last recipe, Managing program complexity with STM.

Getting ready

We will include the same references in the project.clj file and the same requirements in the REPL as we did in the Managing program complexity with STM recipe.

For this recipe, I'm going to use the U.S. political campaign finance data from Open Secrets (http://www.opensecrets.org/). You have to register with the site, but once you do that, the data is free to download. Once you've logged in, look for the Bulk Data link. For this, I downloaded the cycles tables for the Campaign Finance Data. I unzipped them to the data/campaign-fin directory. For this recipe, we'll focus on the Political Action Committee (PAC) data. In this case, we'll just find the total amount of campaign contributions per candidate.

We'll use several utility functions from the last recipe: lazy-read-csv and ->int.

How to do it…

To use agents, we just need to add a few functions to the ones from the last recipe:

  1. The first pair is get-cid and get-amount. These take a row from the data file and return the data fields that we're interested in:
    (defn get-cid [row] (nth row 3))
    (defn get-amount [row] (->int (nth row 4)))
  2. We'll now add a function that takes those two values and wraps them into a vector pair:
    (defn get-cid-amount [row]
      [(get-cid row) (get-amount row)])
  3. Now, we'll need a function that indexes those values in a hash map, adding them:
    (defn add-amount-by [m cid amount]
      (assoc m cid (+ amount (get m cid 0))))
  4. Next, we'll define a function that reads the data from a file and accumulates it in an existing hash map:
    (defn read-file-amounts [m filename]
      (reduce #(add-amount-by %1 (first %2) (second %2))
              m
              (map get-cid-amount
                   (lazy-read-csv filename))))
  5. Finally, we'll need a function to make working with agents easier—force-val. It takes an agent and uses await to block all of the messages currently in its queue to be processed. Then it dereferences the agent. This function will allow us to thread a series of operations on the agents:
    (defn force-val
      [a]
       (await a)
       @a)
  6. Now that everything's in place, here is the function that controls the process:
    (defn main [data-files agent-count]
      (let [agents (map agent (repeat agent-count {}))]
        (dorun
          (map #(send %1 read-file-amounts %2)
               (cycle agents)
               data-files))
        (apply merge-with + (map force-val agents))))

And we can see this in action:

User=> (def data-files ["data/campaign-fin/pacs90.txt"
                        "data/campaign-fin/pacs92.txt"
                        "data/campaign-fin/pacs94.txt"
                        "data/campaign-fin/pacs96.txt"
                        "data/campaign-fin/pacs98.txt"
                        "data/campaign-fin/pacs00.txt"
                        "data/campaign-fin/pacs02.txt"
                        "data/campaign-fin/pacs04.txt"
                        "data/campaign-fin/pacs06.txt"
                        "data/campaign-fin/pacs08.txt"
                        "data/campaign-fin/pacs10.txt"])

user=> (def contribs (main data-files 5))
user=> (contribs "|N00026349|")
280
user=> (contribs "|N00001845|")
134121

How it works…

Except for force-val, all of the agent-related code is in main. Let's walk through the lines that are of interest:

  • We define the number of agents that we want to use. Each agent is initialized to a vector of zeroes of the same length as the number of fields:
      (let [agents (map agent (repeat agent-count {}))]
  • Next, after reading the input CSV file to a sequence of maps and partitioning them into chunks of equal size, we send each agent the read-file-amounts function. We cycle through the agents until all of the files are assigned to an agent:
        (dorun
          (map #(send %1 read-file-amounts %2)
               (cycle agents)
               data-files))
  • Next, we block until each agent is done by calling await, and we dereference each to get its value (both of these take place inside force-val). Once we have the data from each agent, we merge them all together into one hashmap:
        (apply merge-with + (map force-val agents))))

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.254.192