9.7 Example: Concurrent Non-Blocking Cache

In this section, we’ll build a concurrent non-blocking cache, an abstraction that solves a problem that arises often in real-world concurrent programs but is not well addressed by existing libraries. This is the problem of memoizing a function, that is, caching the result of a function so that it need be computed only once. Our solution will be concurrency-safe and will avoid the contention associated with designs based on a single lock for the whole cache.

We’ll use the httpGetBody function below as an example of the type of function we might want to memoize. It makes an HTTP GET request and reads the response body. Calls to this function are relatively expensive, so we’d like to avoid repeating them unnecessarily.

func httpGetBody(url string) (interface{}, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

The final line hides a minor subtlety. ReadAll returns two results, a []byte and an error, but since these are assignable to the declared result types of httpGetBodyinterface{} and error, respectively—we can return the result of the call without further ado. We chose this return type for httpGetBody so that it conforms to the type of functions that our cache is designed to memoize.

Here’s the first draft of the cache:

gopl.io/ch9/memo1
// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo

// A Memo caches the results of calling a Func.
type Memo struct {
    f     Func
    cache map[string]result
}

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

type result struct {
    value interface{}
    err   error
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]result)}
}

// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    return res.value, res.err
}

A Memo instance holds the function f to memoize, of type Func, and the cache, which is a mapping from strings to results. Each result is simply the pair of results returned by a call to f—a value and an error. We’ll show several variations of Memo as the design progresses, but all will share these basic aspects.

An example of how to use Memo appears below. For each element in a stream of incoming URLs, we call Get, logging the latency of the call and the amount of data it returns:

m := memo.New(httpGetBody)
for url := range incomingURLs() {
    start := time.Now()
    value, err := m.Get(url)
    if err != nil {
        log.Print(err)
        continue
    }
    fmt.Printf("%s, %s, %d bytes
",
        url, time.Since(start), len(value.([]byte)))
}

We can use the testing package (the topic of Chapter 11) to systematically investigate the effect of memoization. From the test output below, we see that the URL stream contains duplicates, and that although the first call to (*Memo).Get for each URL takes hundreds of milliseconds, the second request returns the same amount of data in under a millisecond.

$ go test -v gopl.io/ch9/memo1
=== RUN   Test
https://golang.org, 175.026418ms, 7537 bytes
https://godoc.org, 172.686825ms, 6878 bytes
https://play.golang.org, 115.762377ms, 5767 bytes
http://gopl.io, 749.887242ms, 2856 bytes

https://golang.org, 721ns, 7537 bytes
https://godoc.org, 152ns, 6878 bytes
https://play.golang.org, 205ns, 5767 bytes
http://gopl.io, 326ns, 2856 bytes
--- PASS: Test (1.21s)
PASS
ok  gopl.io/ch9/memo1   1.257s

This test executes all calls to Get sequentially.

Since HTTP requests are a great opportunity for parallelism, let’s change the test so that it makes all requests concurrently. The test uses a sync.WaitGroup to wait until the last request is complete before returning.

m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
    n.Add(1)
    go func(url string) {
        start := time.Now()
        value, err := m.Get(url)
        if err != nil {
            log.Print(err)
            return
        }
        fmt.Printf("%s, %s, %d bytes
",
            url, time.Since(start), len(value.([]byte)))
    }(url)
}
n.Wait()

The test runs much faster, but unfortunately it is unlikely to work correctly all the time. We may notice unexpected cache misses, or cache hits that return incorrect values, or even crashes.

Worse, it is likely to work correctly some of the time, so we may not even notice that it has a problem. But if we run it with the -race flag, the race detector (§9.6) often prints a report such as this one:

$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN   TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
  ...

Previous write by goroutine 35:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL    gopl.io/ch9/memo1   2.393s

The reference to memo.go:32 tells us that two goroutines have updated the cache map without any intervening synchronization. Get is not concurrency-safe: it has a data race.

28  func (memo *Memo) Get(key string) (interface{}, error) {
29      res, ok := memo.cache[key]
30      if !ok {
31          res.value, res.err = memo.f(key)
32          memo.cache[key] = res
33      }
34      return res.value, res.err
35  }

The simplest way to make the cache concurrency-safe is to use monitor-based synchronization. All we need to do is add a mutex to the Memo, acquire the mutex lock at the start of Get, and release it before Get returns, so that the two cache operations occur within the critical section:

gopl.io/ch9/memo2
type Memo struct {
    f     Func
    mu    sync.Mutex // guards cache
    cache map[string]result
}

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    memo.mu.Unlock()
    return res.value, res.err
}

Now the race detector is silent, even when running the tests concurrently. Unfortunately this change to Memo reverses our earlier performance gains. By holding the lock for the duration of each call to f, Get serializes all the I/O operations we intended to parallelize. What we need is a non-blocking cache, one that does not serialize calls to the function it memoizes.

In the next implementation of Get, below, the calling goroutine acquires the lock twice: once for the lookup, and then a second time for the update if the lookup returned nothing. In between, other goroutines are free to use the cache.

gopl.io/ch9/memo3
func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    memo.mu.Unlock()
    if !ok {
        res.value, res.err = memo.f(key)

        // Between the two critical sections, several goroutines
        // may race to compute f(key) and update the map.
        memo.mu.Lock()
        memo.cache[key] = res
        memo.mu.Unlock()
    }
    return res.value, res.err
}

The performance improves again, but now we notice that some URLs are being fetched twice. This happens when two or more goroutines call Get for the same URL at about the same time. Both consult the cache, find no value there, and then call the slow function f. Then both of them update the map with the result they obtained. One of the results is overwritten by the other.

Ideally we’d like to avoid this redundant work. This feature is sometimes called duplicate suppression. In the version of Memo below, each map element is a pointer to an entry struct. Each entry contains the memoized result of a call to the function f, as before, but it additionally contains a channel called ready. Just after the entry’s result has been set, this channel will be closed, to broadcast (§8.9) to any other goroutines that it is now safe for them to read the result from the entry.

gopl.io/ch9/memo4
type entry struct {
    res   result
    ready chan struct{} // closed when res is ready
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
    f     Func
    mu    sync.Mutex // guards cache
    cache map[string]*entry
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    e := memo.cache[key]
    if e == nil {
        // This is the first request for this key.
        // This goroutine becomes responsible for computing
        // the value and broadcasting the ready condition.
        e = &entry{ready: make(chan struct{})}
        memo.cache[key] = e
        memo.mu.Unlock()

        e.res.value, e.res.err = memo.f(key)

        close(e.ready) // broadcast ready condition
    } else {
        // This is a repeat request for this key.
        memo.mu.Unlock()

        <-e.ready // wait for ready condition
    }
    return e.res.value, e.res.err
}

A call to Get now involves acquiring the mutex lock that guards the cache map, looking in the map for a pointer to an existing entry, allocating and inserting a new entry if none was found, then releasing the lock. If there was an existing entry, its value is not necessarily ready yet—another goroutine could still be calling the slow function f—so the calling goroutine must wait for the entry’s “ready” condition before it reads the entry’s result. It does this by reading a value from the ready channel, since this operation blocks until the channel is closed.

If there was no existing entry, then by inserting a new “not ready” entry into the map, the current goroutine becomes responsible for invoking the slow function, updating the entry, and broadcasting the readiness of the new entry to any other goroutines that might (by then) be waiting for it.

Notice that the variables e.res.value and e.res.err in the entry are shared among multiple goroutines. The goroutine that creates the entry sets their values, and other goroutines read their values once the “ready” condition has been broadcast. Despite being accessed by multiple goroutines, no mutex lock is necessary. The closing of the ready channel happens before any other goroutine receives the broadcast event, so the write to those variables in the first goroutine happens before they are read by subsequent goroutines. There is no data race.

Our concurrent, duplicate-suppressing, non-blocking cache is complete.

The implementation of Memo above uses a mutex to guard a map variable that is shared by each goroutine that calls Get. It’s interesting to contrast this design with an alternative one in which the map variable is confined to a monitor goroutine to which callers of Get must send a message.

The declarations of Func, result, and entry remain as before:

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

// A result is the result of calling a Func.
type result struct {
    value interface{}
    err   error
}

type entry struct {
    res   result
    ready chan struct{} // closed when res is ready
}

However, the Memo type now consists of a channel, requests, through which the caller of Get communicates with the monitor goroutine. The element type of the channel is a request. Using this structure, the caller of Get sends the monitor goroutine both the key, that is, the argument to the memoized function, and another channel, response, over which the result should be sent back when it becomes available. This channel will carry only a single value.

gopl.io/ch9/memo5
// A request is a message requesting that the Func be applied to key.
type request struct {
    key      string
    response chan<- result // the client wants a single result
}

type Memo struct{ requests chan request }

// New returns a memoization of f.  Clients must subsequently call Close.
func New(f Func) *Memo {
    memo := &Memo{requests: make(chan request)}
    go memo.server(f)
    return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
    response := make(chan result)
    memo.requests <- request{key, response}
    res := <-response
    return res.value, res.err
}

func (memo *Memo) Close() { close(memo.requests) }

The Get method, above, creates a response channel, puts it in the request, sends it to the monitor goroutine, then immediately receives from it.

The cache variable is confined to the monitor goroutine (*Memo).server, shown below. The monitor reads requests in a loop until the request channel is closed by the Close method. For each request, it consults the cache, creating and inserting a new entry if none was found.

func (memo *Memo) server(f Func) {
    cache := make(map[string]*entry)
    for req := range memo.requests {
        e := cache[req.key]
        if e == nil {
            // This is the first request for this key.
            e = &entry{ready: make(chan struct{})}
            cache[req.key] = e
            go e.call(f, req.key) // call f(key)
        }
        go e.deliver(req.response)
    }
}

func (e *entry) call(f Func, key string) {
    // Evaluate the function.
    e.res.value, e.res.err = f(key)
    // Broadcast the ready condition.
    close(e.ready)
}

func (e *entry) deliver(response chan<- result) {
    // Wait for the ready condition.
    <-e.ready
    // Send the result to the client.
    response <- e.res
}

In a similar manner to the mutex-based version, the first request for a given key becomes responsible for calling the function f on that key, storing the result in the entry, and broadcasting the readiness of the entry by closing the ready channel. This is done by (*entry).call.

A subsequent request for the same key finds the existing entry in the map, waits for the result to become ready, and sends the result through the response channel to the client goroutine that called Get. This is done by (*entry).deliver. The call and deliver methods must be called in their own goroutines to ensure that the monitor goroutine does not stop processing new requests.

This example shows that it’s possible to build many concurrent structures using either of the two approaches—shared variables and locks, or communicating sequential processes—without excessive complexity.

It’s not always obvious which approach is preferable in a given situation, but it’s worth knowing how they correspond. Sometimes switching from one approach to the other can make your code simpler.

Exercise 9.3: Extend the Func type and the (*Memo).Get method so that callers may provide an optional done channel through which they can cancel the operation (§8.9). The results of a cancelled Func call should not be cached.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.32.86