In this section, we’ll explore some common concurrency patterns for
executing all the iterations of a loop in parallel.
We’ll consider the problem of producing thumbnail-size images from a
set of full-size ones.
The gopl.io/ch8/thumbnail
package provides an ImageFile
function that can scale a
single image. We won’t show its implementation but it can be downloaded
from gopl.io
.
package thumbnail // ImageFile reads an image from infile and writes // a thumbnail-size version of it in the same directory. // It returns the generated file name, e.g., "foo.thumb.jpg". func ImageFile(infile string) (string, error)
The program below loops over a list of image file names and produces a thumbnail for each one:
// makeThumbnails makes thumbnails of the specified files. func makeThumbnails(filenames []string) { for _, f := range filenames { if _, err := thumbnail.ImageFile(f); err != nil { log.Println(err) } } }
Obviously the order in which we process the files doesn’t matter, since each scaling operation is independent of all the others. Problems like this that consist entirely of subproblems that are completely independent of each other are described as embarrassingly parallel. Embarrassingly parallel problems are the easiest kind to implement concurrently and enjoy performance that scales linearly with the amount of parallelism.
Let’s execute all these operations in parallel, thereby hiding the
latency of the file I/O and using multiple CPUs
for the image-scaling computations.
Our first attempt at a concurrent version just adds a go
keyword.
We’ll ignore errors for now and address them later.
// NOTE: incorrect! func makeThumbnails2(filenames []string) { for _, f := range filenames { go thumbnail.ImageFile(f) // NOTE: ignoring errors } }
This version runs really fast—too fast, in fact, since it takes less
time than the original, even when the slice of file names contains only
a single element.
If there’s no parallelism, how can the concurrent version possibly run
faster?
The answer is that makeThumbnails2
returns before it has finished
doing what it was supposed to do.
It starts all the goroutines, one per file name, but doesn’t wait for
them to finish.
There is no direct way to wait until a goroutine has finished, but we
can change the inner goroutine to report its completion to the outer
goroutine by sending an event on a shared channel.
Since we know that there are exactly len(filenames)
inner
goroutines, the outer goroutine need only count that many events
before it returns:
// makeThumbnails3 makes thumbnails of the specified files in parallel. func makeThumbnails3(filenames []string) { ch := make(chan struct{}) for _, f := range filenames { go func(f string) { thumbnail.ImageFile(f) // NOTE: ignoring errors ch <- struct{}{} }(f) } // Wait for goroutines to complete. for range filenames { <-ch } }
Notice that we passed the value of f
as an
explicit argument to the literal function instead of using the
declaration of f
from the enclosing for
loop:
for _, f := range filenames { go func() { thumbnail.ImageFile(f) // NOTE: incorrect! // ... }() }
Recall the problem of loop variable capture inside an anonymous
function, described in Section 5.6.1.
Above, the single variable f
is shared by all the
anonymous function values and updated by successive loop iterations.
By the time the new goroutines start executing the literal function,
the for
loop may have updated f
and started
another iteration or (more likely) finished entirely, so when these
goroutines read the value of f
, they all
observe it to have the value of the final element of the slice.
By adding an explicit parameter, we ensure that we use the value of
f
that is current when the go
statement is executed.
What if we want to return values from each worker goroutine to the
main one?
If the call to thumbnail.ImageFile
fails to create a file, it returns
an error.
The next version of makeThumbnails
returns the first error it
receives from any of the scaling operations:
// makeThumbnails4 makes thumbnails for the specified files in parallel. // It returns an error if any step failed. func makeThumbnails4(filenames []string) error { errors := make(chan error) for _, f := range filenames { go func(f string) { _, err := thumbnail.ImageFile(f) errors <- err }(f) } for range filenames { if err := <-errors; err != nil { return err // NOTE: incorrect: goroutine leak! } } return nil }
This function has a subtle bug.
When it encounters the first non-nil error, it returns the error to
the caller, leaving no goroutine draining the errors
channel.
Each remaining worker goroutine will block forever when it tries to send a value
on that channel, and will never terminate.
This situation, a goroutine leak (§8.4.4), may
cause the whole program to get stuck or to run out of memory.
The simplest solution is to use a buffered channel with sufficient capacity that no worker goroutine will block when it sends a message. (An alternative solution is to create another goroutine to drain the channel while the main goroutine returns the first error without delay.)
The next version of makeThumbnails
uses a buffered channel to
return the names of the generated image files along with any
errors.
// makeThumbnails5 makes thumbnails for the specified files in parallel. // It returns the generated file names in an arbitrary order, // or an error if any step failed. func makeThumbnails5(filenames []string) (thumbfiles []string, err error) { type item struct { thumbfile string err error } ch := make(chan item, len(filenames)) for _, f := range filenames { go func(f string) { var it item it.thumbfile, it.err = thumbnail.ImageFile(f) ch <- it }(f) } for range filenames { it := <-ch if it.err != nil { return nil, it.err } thumbfiles = append(thumbfiles, it.thumbfile) } return thumbfiles, nil }
Our final version of makeThumbnails
, below, returns the total number
of bytes occupied by the new files.
Unlike the previous versions, however, it receives the file names not
as a slice but over a channel of strings, so we cannot predict the
number of loop iterations.
To know when the last goroutine has finished (which may not be the
last one to start), we need to increment a counter before each
goroutine starts and decrement it as each goroutine finishes.
This demands a special kind of counter, one that can be safely
manipulated from multiple goroutines and that provides a way to wait
until it becomes zero.
This counter type is known as sync.WaitGroup
, and the code
below shows how to use it:
// makeThumbnails6 makes thumbnails for each file received from the channel. // It returns the number of bytes occupied by the files it creates. func makeThumbnails6(filenames <-chan string) int64 { sizes := make(chan int64) var wg sync.WaitGroup // number of working goroutines for f := range filenames { wg.Add(1) // worker go func(f string) { defer wg.Done() thumb, err := thumbnail.ImageFile(f) if err != nil { log.Println(err) return } info, _ := os.Stat(thumb) // OK to ignore error sizes <- info.Size() }(f) } // closer go func() { wg.Wait() close(sizes) }() var total int64 for size := range sizes { total += size } return total }
Note the asymmetry in the Add
and Done
methods.
Add
, which increments the counter, must be called before the
worker goroutine starts, not within it; otherwise we would not be sure
that the Add
happens before the “closer” goroutine calls
Wait
.
Also, Add
takes a parameter, but Done
does not;
it’s equivalent to Add(-1)
.
We use defer
to ensure that the counter is decremented even in
the error case.
The structure of the code above is a common and idiomatic pattern for
looping in parallel when we don’t know the number of iterations.
The sizes
channel carries each file size back to the main
goroutine, which receives them using a range
loop and computes
the sum.
Observe how we create a closer goroutine that waits for the workers
to finish before closing the sizes
channel.
These two operations, wait and close, must be concurrent with the
loop over sizes
.
Consider the alternatives: if the wait operation were placed in the main goroutine before the
loop, it would never end, and if placed after the loop,
it would be unreachable since with nothing closing the channel, the loop
would never terminate.
Figure 8.5 illustrates the sequence of events in
the makeThumbnails6
function.
The vertical lines represent goroutines.
The thin segments indicate sleep, the thick segments activity.
The diagonal arrows indicate events that synchronize one
goroutine with another.
Time flows down.
Notice how the main goroutine spends most of its time in the
range
loop asleep, waiting for a worker to send a value or the
closer to close the channel.
Exercise 8.4:
Modify the reverb2
server to use a sync.WaitGroup
per
connection to count the number of active echo
goroutines.
When it falls to zero, close the write half of the TCP connection as
described in Exercise 8.3.
Verify that your modified netcat3
client from that exercise
waits for the final echoes of multiple concurrent shouts, even after the
standard input has been closed.
Exercise 8.5: Take an existing CPU-bound sequential program, such as the Mandelbrot program of Section 3.3 or the 3-D surface computation of Section 3.2, and execute its main loop in parallel using channels for communication. How much faster does it run on a multiprocessor machine? What is the optimal number of goroutines to use?
18.216.186.164