Use Agents for Asynchronous Updates

Some applications have tasks that can proceed independently with minimal coordination between tasks. Clojure agents support this style of task.

Agents have much in common with refs. Like refs, you create an agent by wrapping some piece of initial state:

 (agent initial-state)

Create a counter agent that wraps an initial count of zero:

 (​def​ counter (agent 0))
 -> #​'user/counter

Once you have an agent, you can send the agent a function to update its state. send queues an update-fn to run later, on a thread in a thread pool:

 (send agent update-fn & args)

Sending to an agent is much like commuting a ref. Tell the counter to inc:

 (send counter inc)
 -> #object[clojure.lang.Agent 0x7ae288e1 {:status :ready, :val 1}]

Notice that the call to send doesn’t return the new value of the agent, returning instead the agent itself. That’s because send does not know the new value. After send queues the inc to run later, it returns immediately.

Although send does not know the new value of an agent, the REPL might know. Depending on whether the agent thread or the REPL thread runs first, you might see a 1 or a 0 for the :val in the previous output.

You can check the current value of an agent with deref/@, just as you would a ref. By the time you get around to checking the counter, the inc will almost certainly have completed on the thread pool, raising the value to 1:

 @counter
 -> 1

If the race condition between the REPL and the agent thread bothers you, there is a solution. If you want to be sure that the agent has completed the actions you sent to it, you can call await or await-for:

 (await & agents)
 (await-for timeout-millis & agents)

These functions cause the current thread to block until all actions sent from the current thread or agent have completed. await-for returns nil if the timeout expires and returns a non-nil value otherwise. await has no timeout, so be careful: await is willing to wait forever.

Validating Agents and Handling Errors

Agents have other points in common with refs. They also can take a validation function:

 (agent initial-state options*)
 ; options include:
 ; :validator validate-fn
 ; :meta metadata-map
 ; :error-handler handler-fn
 ; :error-mode mode-keyword (:continue or :fail)

Recreate the counter with a validator that ensures it’s a number:

 (​def​ counter (agent 0 :validator number?))
 -> #​'user/counter

Try to set the agent to a value that’s not a number by passing an update function that ignores the current value and simply returns a string:

 (send counter (​fn​ [_] ​"boo"​))
 -> #object[clojure.lang.Agent 0x3a46c14f {:status :ready, :val 0}]

Everything looks fine (so far) because send still returns immediately. When the agent tries to update itself on a pooled thread, it encounters an exception while applying the action. Agents have two possible error modes—:fail and :continue. If no :error-handler is supplied when the agent is created, the error mode is set to :fail, and any exception that occurs during an action or during validation puts the agent into an exceptional state.

When an agent is in this failed state, it can still be dereferenced and will return the last value from before the failed action. To discover the last error on an agent, call agent-error which returns either the failure or nil if not in a failed state:

 (agent-error counter)
 -> #error {
  :cause ​"Invalid reference state"
  :via [{:type java.lang.IllegalStateException
  :message ​"Invalid reference state"
  :at [clojure.lang.ARef validate ​"ARef.java"​ 33]}]
  :trace
  [[clojure.lang.ARef validate ​"ARef.java"​ 33]
  ... ]}]}

All new actions are queued until the agent is restarted using restart-agent. Once an agent has errors, all subsequent attempts to query the agent return an error. You can make the agent active again by calling restart-agent:

 (restart-agent counter 0)
 -> nil
 
 @counter
 -> 0

If an :error-handler is supplied when the agent is created, the agent will instead be in error mode :continue. When an error occurs, the error handler is invoked and the agent then continues as if no error occurred.

 (​defn​ handler [agent err]
  (println ​"ERR!"​ (.getMessage err)))
 -> #​'user/handler
 
 (​def​ counter2 (agent 0 :validator number? :error-handler handler))
 -> #​'user/counter2
 (send counter2 (​fn​ [_] ​"boo"​))
 | ERR! Invalid reference state
 -> #object[clojure.lang.Agent 0x5ba87f7f {:status :ready, :val 0}]
 
 (send counter2 inc)
 -> #object[clojure.lang.Agent 0x5ba87f7f {:status :ready, :val 0}]
 
 @counter2
 -> 1

Now that you know the basics of agents, let’s use them in conjunction with refs and transactions.

Including Agents in Transactions

Transactions should not have side effects, because Clojure may retry a transaction an arbitrary number of times. However, sometimes you want a side effect when a transaction succeeds. Agents provide a solution. If you send an action to an agent from within a transaction, that action is sent exactly once, if and only if the transaction succeeds.

As an example of where this would be useful, consider an agent that writes to a file when a transaction succeeds. You could combine such an agent with the chat example from commute, to automatically back up chat messages. First, create a backup-agent that stores the filename to write to:

 (​def​ backup-agent (agent ​"output/messages-backup.clj"​))

Then, create a modified version of add-message. The new function add-message-with-backup should do two additional things:

  • Grab the return value of commute, which is the current database of messages, in a let binding.

  • While still inside a transaction, send an action to the backup agent that writes the message database to filename. For simplicity, have the action function return filename so that the agent uses the same filename for the next backup.

 (​defn​ add-message-with-backup [msg]
  (dosync
  (​let​ [snapshot (commute messages conj msg)]
  (send-off backup-agent (​fn​ [filename]
  (spit filename snapshot)
  filename))
  snapshot)))

The new function has one other critical difference: it calls send-off instead of send to communicate with the agent. send-off is a variant of send for actions that expect to block, as a file write might do. send-off actions get their own expandable thread pool. Never send a blocking function, or you may unnecessarily prevent other agents from making progress.

Try adding some messages using add-message-with-backup:

 (add-message-with-backup (->Message ​"John"​ ​"Message One"​))
 -> (#:user.Message{:sender ​"John"​, :text ​"Message One"​})
 
 (add-message-with-backup (->Message ​"Jane"​ ​"Message Two"​))
 -> (#:user.Message{:sender ​"Jane"​, :text ​"Message Two"​}
  #:user.Message{:sender ​"John"​, :text ​"Message One"​})

You can check both the in-memory messages as well as the backup file messages-backup to verify that they contain the same structure.

You could enhance the backup strategy in this example in various ways. You could provide the option to back up less often than on every update or back up only information that has changed since the last backup.

Since Clojure’s STM provides the ACI properties of ACID, and since writing to a file provides the D (“durability”), it’s tempting to think that STM plus a backup agent equals a database. This is not the case. A Clojure transaction promises only to send/send-off an action to the agent; it does not actually perform the action under the ACI umbrella. So for example, a transaction could complete, and then someone could unplug the power cord before the agent writes to the database. The moral is simple. If your problem calls for a real database, use a real database.

The Unified Update Model

As you’ve seen, refs, atoms, and agents all provide functions for updating their state by applying a function to their previous state. This unified model for handling shared state is one of the central concepts of Clojure. The unified functions for each reference type are summarized in the following table.

Update MechanismRef FunctionAtom FunctionAgent Function
Function application

alter

swap!

send-off

Function (commutative)

commute

N/A

N/A

Function (nonblocking)

N/A

N/A

send

Simple setter

ref-set

reset!

N/A

The unified update model is by far the most important way to update refs, atoms, and agents. The ancillary functions, on the other hand, are optimizations and options that stem from the semantics peculiar to each API:

  • The opportunity for the commute optimization arises when coordinating updates. Since only refs provide coordinated updates, commute makes sense only for refs.

  • Updates to refs and atoms take place on the thread they are called on, so they provide no scheduling options. Agents update later, on a thread pool, making blocking/nonblocking a relevant scheduling option.

Clojure’s final reference type, the var, is a different beast entirely. Vars do not participate in the unified update model and are instead used to manage thread-local, private state.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.202.27