9

Atomic Memory Operations

Atomic memory operations provide the low-level foundation necessary to implement other synchronization primitives. In general, you can replace all atomic operations of a concurrent algorithm with mutexes and channels. Nevertheless, they are interesting and sometimes confusing constructs, and you should know how they work. If you use them carefully, they can become good tools for code optimization without increasing complexity.

In this chapter, we will explore the following topics:

  • Memory guarantees of atomic memory operations
  • The compare-and-swap operation
  • Practical uses of atomics, including counters, heartbeats/progress meters, cancellations, and detecting change

Technical Requirements

The source code for this particular chapter is available on GitHub at https://github.com/PacktPublishing/Effective-Concurrency-in-Go/tree/main/chapter9.

Memory guarantees

Why do we need separate functions for atomic memory operations? If we write to a variable whose size is less or equal to the machine word size (which is what the int type is defined to be), such as a=1, wouldn’t that be atomic? The Go memory model actually guarantees that the write operation will be atomic; however, it does not guarantee when other goroutines will see the effects of that write operation, if ever. Let’s try to dissect what this statement means. The first part simply says that if you write to a shared memory location that is the same size as a machine word (i.e., int) from one goroutine and read it from another, you will not observe some random value even if there is a race. The memory model guarantees that you will only observe the value before the write operation, or the value after it (this is not true for all languages.) This also means that if the write operation is larger than the machine word size, then a goroutine reading this value may see the underlying object in an inconsistent state. For example, a string value includes two values, a pointer to the underlying array, and the string length. The write operations to these individual fields would be atomic, but a racy read may read a string with a nil array but nonzero length. The second part of the statement says that the compiler can optimize or reorder code, or that the hardware executes the memory operations out of order, in such a way that another goroutine cannot see the effects of the write operation at the expected time. The standard example that illustrates this point is the following memory race:

func main() {
     var str string
     var done bool
     go func() {
          str = "Done!"
          done = true
     }()
     for !done {
     }
     fmt.Println(str)
}

There is a memory race, because the str and done variables are written in a goroutine and read in another one without explicit synchronization. There are several ways this program can behave:

  • It can print Done!.
  • It can print an empty string. This means that the memory write to done, but not the memory write to str, is seen by the main goroutine.
  • The program may hang. This means that the memory write to done is not seen by the main goroutine.

This is where atomics make a difference. The following program is race-free:

func main() {
     var str done atomic.Value
     var done atomic.Bool
     str.Store("")
     go func() {
          str.Store("Done!")
          done.Store(true)
     }()
     for !done.Load() {
     }
     fmt.Println(str.Load())
}

The memory guarantee for atomic operations is as follows. If the effect of an atomic memory write is observed by an atomic read, then the atomic write happened before the atomic read. This also guarantees that the following program will either print 1 or nothing (it will never print 0):

func main() {
     var done atomic.Bool
     var a int
     go func() {
          a = 1
          done.Store(true)
     }()
     if done.Load() {
          fmt.Println(a)
     }
}

Note that there is still a race condition here, but not a memory race. Depending on the execution order of the statements, the main goroutine may or may not see done as true. However, if the main goroutine sees done as true, then it is guaranteed that a=1.

This is one of the reasons why working with atomics can get complicated – memory ordering guarantees are conditional. They never block the running goroutine, so the fact that you tested whether an atomic read returned a certain value for a variable does not mean that it still has the same value when the body of that if statement is running. That’s why you need to be careful when you are using atomics. It is easy to fall into race conditions using them, as with the previous program. Remember this – you can always write the same program without atomics.

Compare and swap

Any time you test for a condition and act based on the result, you can create a race condition. For example, the following function does not prevent mutual exclusion, despite the use of atomics:

var locked sync.Bool
func wrongCriticalSectionExample() {
     if !locked.Load() {
          // Another goroutine may lock it now!
          locked.Store(true)
          defer locked.Store(false)
          // This goroutine enters critical section
          // but so can another goroutine
     }
}

The function starts by testing whether the atomic locked value is false. Two goroutines can execute this statement simultaneously, and seeing that it is false, both can enter the critical section and set locked to true. What is needed here is an atomic operation encompassing both the comparison and store operations. That is the compare-and-swap (CAS) operation, which does exactly what its name implies – it compares whether a variable has the expected value and, if so, swaps that value with a given value atomically. If the variable has a different value, nothing is changed – that is, a CAS operation is the following, done atomically:

if *variable == testValue {
    *variable = newValue
    return true
}
return false

Now, you can actually implement a non-blocking mutex:

func criticalSection() {
     if locked.CompareAndSwap(false,true) {
     defer locked.Store(false)
     // critical section
     }
}

This will enter the critical section only if locked is false. If that is the case, it will atomically set locked to true and enter its critical section. Otherwise, it will skip the critical section and continue. Therefore, this can actually be used in place of Mutex.TryLock.

Practical uses of atomics

Here are a few examples of using atomics. These are simple race-free uses of atomics in different scenarios.

Counters

Atomics can be used as efficient concurrency-safe counters. The following program creates many goroutines, each of which will add 1 to the shared counter. Another goroutine loops until the counter reaches 10000. Because of the use of atomics here, this program is race-free, and it will always terminate by eventually printing 10000:

var count int64
func main() {
     for i := 0; i < 10000; i++ {
          go func() {
               atomic.AddInt64(&count, 1)
               }()
          }
     for {
          v := atomic.LoadInt64(&count)
          fmt.Println(v)
          if v == 10000 {
               break
          }
     }
}

Heartbeat and progress meter

Sometimes, a goroutine can become unresponsive or not progress as quickly as necessary. A heartbeat utility and progress meter can be used to observe such goroutines. There are a few ways this can be done – for example, the observed goroutine can use non-blocking sends to announce progress, or it can announce its progress by incrementing a shared variable protected by a mutex. Atomics allow us to implement the shared variable scheme without a mutex. This also has the benefit of being observable by multiple goroutines without additional synchronization.

So, let’s define a simple ProgressMeter type containing an atomic value:

type ProgressMeter struct {
     progress int64
}

The following method is used by the observed goroutine to signal its progress. This method simply increments the progress value by 1 atomically:

func (pm *ProgressMeter) Progress() {
     atomic.AddInt64(&pm.progress, 1)
}

The Get method returns the current value of the progress. Note that the load is atomic; without that, there is a chance of missing atomic additions to the variable:

func (pm *ProgressMeter) Get() int64 {
     return atomic.LoadInt64(&pm.progress)
}

An important detail in this implementation is that both the Progress() and Get() methods must be atomic. Let’s say you wanted to also keep the timestamp when the last progress is recorded. You can add a timestamp variable and use another atomic read/write:

type WrongProgressMeter struct {
     progress    int64
     timestamp   int64
}
func (pm *WrongProgressMeter) Progress() {
     atomic.AddInt64(&pm.progress, 1) 
     atomic.StoreInt64(&pm.timestamp, 
       time.Now().UnixNano())
}
func (pm *WrongProgressMeter) Get() (n int64 ,ts int64) {
     n = atomic.LoadInt64(&pm.progress) 
     ts = atomic.LoadInt64(&pm.timestamp)
     return
}

This implementation can read an updated progress value with a stale timestamp. The use of atomics guarantees that the write operations are observed in the order they are written, but it does not guarantee the atomicity of ProgressMeter updates. A correct implementation should use Mutex to ensure atomic updates.

Now, let’s write a long-running goroutine that uses this progress meter to announce its progress. The following goroutine simply sleeps for 120 milliseconds and records its progress:

func longGoroutine(ctx context.Context, pm *ProgressMeter) {
     for {
          select {
          case <-ctx.Done():
               fmt.Println("Canceled")
               return
          default:
          }
     time.Sleep(time.Duration(rand.Intn(120)) * 
       time.Millisecond)
     pm.Progress()
     }
}

The observer goroutine will expect the observed goroutine to record its progress at least every 100 milliseconds. If that doesn’t happen, it will cancel the context to terminate the observed goroutine. It will terminate itself as well. With this setup, the observed goroutine will eventually take longer than 100 milliseconds between two progress announcements; hence, the program should terminate:

func observer(ctx context.Context, cancel func(), progress *ProgressMeter) {
     tick := time.NewTicker(100 * time.Millisecond)
     defer tick.Stop()
     var lastProgress int64
     for {
          select {
          case <-ctx.Done():
               return
          case <-tick.C:
               p := progress.Get()
               if p == lastProgress {
                    fmt.Println("No progress since 
                      last time, canceling")
                    cancel()
                    return
               }
               fmt.Printf("Progress: %d
", p)
               lastProgress = p
               }
          }
     }

We wire it up by creating the long-running goroutine and its observer, using a context and a progress meter:

func main() {
     var progress ProgressMeter
     ctx, cancel := context.WithCancel(
       context.Background())
     defer cancel()
     wg := sync.WaitGroup{}
     wg.Add(1)
     go func() {
          defer wg.Done()
          longGoroutine(ctx, &progress)
     }()
     go observer(ctx, cancel, &progress)
     wg.Wait()
}

Note that we pass the cancel function to the observer so that it can send a cancellation message to the observed goroutine. We will look at another way of doing that next.

Cancellations

We have already looked at using the closing of a channel to signal cancellations. Context implementations use this paradigm to signal cancellations and timeouts. A simple cancellation scheme can also be implemented using atomics:

func CancelSupport() (cancel func(), isCancelled func() bool) {
     v := atomic.Bool{}
     cancel = func() {
          v.Store(true)
     }
     isCancelled = func() bool {
          return v.Load()
     }
     return
}

The CancelSupport function returns two closures – the cancel() function can be called to signal cancellation, and the isCancelled() function can be used to check whether a cancellation request has been registered. Both closures share an atomic bool value. This can be used as follows:

func main() {
     cancel, isCanceled := CancelSupport()
     wg := sync.WaitGroup{}
     wg.Add(1)
     go func() {
          defer wg.Done()
          for {
               time.Sleep(100 * time.Millisecond)
               if isCanceled() {
                    fmt.Println("Cancelled")
                    return
               }
          }
     }()
     time.AfterFunc(5*time.Second, cancel)
     wg.Wait()
}

Detecting change

Let’s say you have a shared variable that can be updated from multiple goroutines. You read this variable, perform some computation, and now you want to update it. However, another goroutine may have already modified that variable after you had your copy. Therefore, you want to update this variable only if someone else did not change it. The following code snippet illustrates this using CAS:

var sharedValue atomic.Pointer[SomeStruct]
func updateSharedValue() {
     myCopy := sharedValue.Load()
     newCopy := computeNewCopy(*myCopy)
     if sharedValue.CompareAndSwap(myCopy, &newCopy) {
          fmt.Println("Set value successful")
          } else {
          fmt.Println("Another goroutine modified 
            the value")
     }
}

This code is race-prone, so you have to be careful. The sharedValue.Load() call atomically returns a pointer to the shared value. If another goroutine modifies the contents of the pointed *sharedValue object, then we have a race. This works only when all goroutines atomically get the pointer and make a copy of the underlying data structure. Then, we write the modified copy using CAS, which may fail if another goroutine behaved more quicker.

Summary

In conclusion, you do not need atomics to implement correct concurrent algorithms. However, they can be nice to have if you identify a concurrency bottleneck. You can replace some simple mutex-protected updates (such as counters) with atomics, provided you also use atomic reads to read them. You can use CAS operations to detect concurrent modifications, but also note that few concurrent algorithms need that.

In the next chapter, we will look at how we can diagnose problems and troubleshoot them in concurrent programs.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.95.38