Atomic updates and state

It is a common use case to read a data element, execute some logic, and update with a new value. For single-threaded programs, it bears no consequences; but for concurrent scenarios, the entire operation must be carried out in a lockstep, as an atomic operation. This case is so common that many processors support this at the hardware level using a special Compare-and-swap (CAS) instruction, which is much cheaper than locking. On x86/x64 architectures, the instruction is called CompareExchange (CMPXCHG).

Unfortunately, it is possible that another thread updates the variable with the same value that the thread, which is working on the atomic update, is going to compare the old value against. This is known as the "ABA" problem. The set of instructions such as "Load-linked" (LL) and "Store-conditional" (SC), which are found in some other architectures, provide an alternative to CAS without the ABA problem. After the LL instruction reads the value from an address, the SC instruction to update the address with a new value will only go through if the address has not been updated since the LL instruction was successful.

Atomic updates in Java

Java has a bunch of built-in lock free, atomic, thread safe compare-and-swap abstractions for the state management. They live in the java.util.concurrent.atomic package. For primitive types, such as boolean, integer, and long, there are the AtomicBoolean, AtomicInteger, and AtomicLong classes respectively. The latter two classes support additional atomic add/subtract operations. For atomic reference updates, there are the AtomicReference, AtomicMarkableReference, and AtomicStampedReference classes for the arbitrary objects. There is also a support available for arrays where the array elements can be updated atomically—AtomicIntegerArray, AtomicLongArray, and AtomicReferenceArray. They are easy to use; here is the example:

(import 'java.util.concurrent.atomic.AtomicReference)
(def ^AtomicReference x (AtomicReference. "foo"))
(.compareAndSet x "foo" "bar")
(import 'java.util.concurrent.atomic.AtomicInteger)
(def ^AtomicInteger y (AtomicInteger. 10))
(.getAndAdd y 5)

However, where and how to use it is subjected to the update points and the logic in the code. The atomic updates are not guaranteed to be non-blocking. Atomic updates are not a substitute to locking in Java, but rather a convenience, only when the scope is limited to a compare and swap operation for one mutable variable.

Clojure's support for atomic updates

Clojure's atomic update abstraction is called "atom". It uses AtomicReference under the hood. An operation on AtomicInteger or AtomicLong may be slightly faster than on the Clojure atom, because the former uses primitives. But neither of them are too cheap, due to the compare-and-swap instruction that they use in the CPU. The speed really depends on how frequently the mutation happens, and how the JIT compiler optimizes the code. The benefit of speed may not show up until the code is run several hundred thousand times, and having an atom mutated very frequently will increase the latency due to the retries. Measuring the latency under actual (or similar to actual) load can tell better. An example of using an atom is as follows:

user=> (def a (atom 0))
#'user/a
user=> (swap! a inc)
1
user=> @a
1
user=> (compare-and-set! a 1 5)
true
user=> (reset! a 20)
20

The swap! function provides a notably different style of carrying out atomic updates than the compareAndSwap(oldval, newval) methods. While compareAndSwap() compares and sets the value, returning true if it's a success and false if it's a failure, swap! keeps on trying to update in an endless loop until it succeeds. This style is a popular pattern that is followed among Java developers. However, there is also a potential pitfall associated with the update-in-loop style. As the concurrency of the updaters gets higher, the performance of the update may gradually degrade. Then again, high concurrency on the atomic updates raises a question of whether or not uncoordinated updates was a good idea at all for the use-case. The compare-and-set! and reset! are pretty straightforward.

The function passed to swap! is required to be pure (as in side effect free), because it is retried several times in a loop during contention. If the function is not pure, the side effect may happen as many times as the retries.

It is noteworthy that atoms are not "coordinated", which means that when an atom is used concurrently by different threads, we cannot predict the order in which the operations work on it, and we cannot guarantee the end result as a consequence. The code we write around atoms should be designed with this constraint in mind. In many scenarios, atoms may not be a good fit due to the lack of coordination—watch out for that in the program design. Atoms support meta data and basic validation mechanism via extra arguments. The following examples illustrate these features:

user=> (def a (atom 0 :meta {:foo :bar}))
user=> (meta a)
{:foo :bar}
user=> (def age (atom 0 :validator (fn [x] (if (> x 200) false true))))
user=> (reset! age 200)
200
user=> (swap! age inc)
IllegalStateException Invalid reference state  clojure.lang.ARef.validate (ARef.java:33)

The second important thing is that atoms support is adding and removing watches on them. We will discuss watches later in the chapter.

Faster writes with atom striping

We know that atoms present contention when multiple threads try to update the state at the same time. This implies that atoms have great performance when the writes are infrequent. There are some use cases, for example metrics counters, where the writes need to be fast and frequent, but the reads are fewer and can tolerate some inconsistency. For such use cases, instead of directing all the updates to a single atom, we can maintain a bunch of atoms where each thread updates a different atom, thus reducing contention. Reads from these atoms cannot be guaranteed to be consistent. Let's develop an example of such a counter:

(def ^:const n-cpu (.availableProcessors (Runtime/getRuntime)))
(def counters (vec (repeatedly n-cpu #(atom 0))))
(defn inc! []
  ;; consider java.util.concurrent.ThreadLocalRandom in Java 7+
  ;; which is faster than Math/random that rand-int is based on
  (let [i (rand-int n-cpu)]
    (swap! (get counters i) inc)))
(defn value []
  (transduce (map deref) + counters))

In the previous example, we created a vector called counters of the same size as the number of CPU cores in the computer, and initialize each element with an atom of initial value 0. The function called inc! updates the counter by picking up a random atom from counters, and incrementing the value by 1. We also assumed that rand-int distributes the picking up of atom uniformly across all the processor cores, so that we have almost zero contention. The value function simply walks over all the atoms and adds up their deref'ed values to return the counter value. The example uses clojure.core/rand-int, which depends on java.lang.Math/random (due to Java 6 support) to randomly find out the next counter atom. Let's see how we can optimize this when using Java 7 or above:

(import 'java.util.concurrent.ThreadLocalRandom)
(defn inc! []
  (let [i (.nextLong (ThreadLocalRandom/current) n-cpu)]
    (swap! (get counters i) inc)))

Here, we import the java.util.concurrent.ThreadLocalRandom class, and define the inc! function to pick up the next random atom using ThreadLocalRandom. Everything else remains the same.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.199.152