Combining agents and STM

Agents by themselves are pretty useful. However, if we want to still use the agent task queuing and concurrency framework, even though an agent function needs to coordinate the state beyond the agent's own data, we'll need to use both agents and the STM: send or send-off to coordinate the agent's state. This will need to be combined with dosync, ref-set, alter, or commute inside the agent function to coordinate with the other state.

This combination provides simplicity over complex state and data coordination problems. This is a huge help in managing the complexity of a data processing and analysis system.

For this recipe, we'll look at the same problem we did in the Managing program complexity with agents recipe. However, this time we'll structure it a little differently. The final result will be stored in a shared reference, and the agents will update it as they go.

Getting ready

We'll need to use the same dependencies as we did for Managing program complexity with agents, and we'll use two values and functions from that recipe: data-files and read-file-amounts.

How to do it…

For this recipe, we need to define a few functions to work through a queue of input chunks and then block until all of the processing is complete:

  1. Most of what we use will be from the previous recipe, including read-file-amounts. However, we'll wrap it in another function that takes its output and uses commute to update the shared-count hashmap with the counts it has just read:
    (defn read-update-amounts [m filename count-ref]
      (dosync
        (let [file-amounts (read-file-amounts m filename)]
          (commute count-ref
                   #(merge-with + % file-amounts)))))
  2. We're now ready for the main function. This creates the agents and the shared reference, sends tasks to the agents, and waits for the results before returning them:
    (defn main [data-files agent-count]
      (let [counts (ref {})
            agents (map agent (repeat agent-count {}))]
        (dorun
          (map #(send %1 read-update-amounts %2 counts)
               (cycle agents)
               data-files))
        (doseq [a agents]
          (await a))
        @counts))

How it works…

Using this code looks exactly like how we'd expect:

user=> (def amounts (main data-files 8))
user=> (take 5 amounts)
(["" 106549]
 ["|N00032245|" 22750]
 ["|N00027812|" 150]
 ["|N00030973|" 9900]
 ["|N00005656|" 11598514])

This solution uses agents to handle the work, and it uses the STM to manage shared data structures. The main function first assigns each input file to an agent. Each agent then reads the input file and totals the amount of contributions for each candidate. It takes those totals and uses the STM to update the shared counts.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.61.81