8.5 Looping in Parallel

In this section, we’ll explore some common concurrency patterns for executing all the iterations of a loop in parallel. We’ll consider the problem of producing thumbnail-size images from a set of full-size ones. The gopl.io/ch8/thumbnail package provides an ImageFile function that can scale a single image. We won’t show its implementation but it can be downloaded from gopl.io.

gopl.io/ch8/thumbnail
package thumbnail

// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g., "foo.thumb.jpg".
func ImageFile(infile string) (string, error)

The program below loops over a list of image file names and produces a thumbnail for each one:

gopl.io/ch8/thumbnail
// makeThumbnails makes thumbnails of the specified files.
func makeThumbnails(filenames []string) {
    for _, f := range filenames {
        if _, err := thumbnail.ImageFile(f); err != nil {
            log.Println(err)
        }
    }
}

Obviously the order in which we process the files doesn’t matter, since each scaling operation is independent of all the others. Problems like this that consist entirely of subproblems that are completely independent of each other are described as embarrassingly parallel. Embarrassingly parallel problems are the easiest kind to implement concurrently and enjoy performance that scales linearly with the amount of parallelism.

Let’s execute all these operations in parallel, thereby hiding the latency of the file I/O and using multiple CPUs for the image-scaling computations. Our first attempt at a concurrent version just adds a go keyword. We’ll ignore errors for now and address them later.

// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
    for _, f := range filenames {
        go thumbnail.ImageFile(f) // NOTE: ignoring errors
    }
}

This version runs really fast—too fast, in fact, since it takes less time than the original, even when the slice of file names contains only a single element. If there’s no parallelism, how can the concurrent version possibly run faster? The answer is that makeThumbnails2 returns before it has finished doing what it was supposed to do. It starts all the goroutines, one per file name, but doesn’t wait for them to finish.

There is no direct way to wait until a goroutine has finished, but we can change the inner goroutine to report its completion to the outer goroutine by sending an event on a shared channel. Since we know that there are exactly len(filenames) inner goroutines, the outer goroutine need only count that many events before it returns:

// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
    ch := make(chan struct{})
    for _, f := range filenames {
        go func(f string) {
            thumbnail.ImageFile(f) // NOTE: ignoring errors
            ch <- struct{}{}
        }(f)
    }

    // Wait for goroutines to complete.
    for range filenames {
        <-ch
    }
}

Notice that we passed the value of f as an explicit argument to the literal function instead of using the declaration of f from the enclosing for loop:

for _, f := range filenames {
    go func() {
        thumbnail.ImageFile(f) // NOTE: incorrect!
        // ...
    }()
}

Recall the problem of loop variable capture inside an anonymous function, described in Section 5.6.1. Above, the single variable f is shared by all the anonymous function values and updated by successive loop iterations. By the time the new goroutines start executing the literal function, the for loop may have updated f and started another iteration or (more likely) finished entirely, so when these goroutines read the value of f, they all observe it to have the value of the final element of the slice. By adding an explicit parameter, we ensure that we use the value of f that is current when the go statement is executed.

What if we want to return values from each worker goroutine to the main one? If the call to thumbnail.ImageFile fails to create a file, it returns an error. The next version of makeThumbnails returns the first error it receives from any of the scaling operations:

// makeThumbnails4 makes thumbnails for the specified files in parallel.
// It returns an error if any step failed.
func makeThumbnails4(filenames []string) error {
    errors := make(chan error)

    for _, f := range filenames {
        go func(f string) {
            _, err := thumbnail.ImageFile(f)
            errors <- err
        }(f)
    }

    for range filenames {
        if err := <-errors; err != nil {
            return err // NOTE: incorrect: goroutine leak!
        }
    }

    return nil
}

This function has a subtle bug. When it encounters the first non-nil error, it returns the error to the caller, leaving no goroutine draining the errors channel. Each remaining worker goroutine will block forever when it tries to send a value on that channel, and will never terminate. This situation, a goroutine leak (§8.4.4), may cause the whole program to get stuck or to run out of memory.

The simplest solution is to use a buffered channel with sufficient capacity that no worker goroutine will block when it sends a message. (An alternative solution is to create another goroutine to drain the channel while the main goroutine returns the first error without delay.)

The next version of makeThumbnails uses a buffered channel to return the names of the generated image files along with any errors.

// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
    type item struct {
        thumbfile string
        err       error
    }

    ch := make(chan item, len(filenames))
    for _, f := range filenames {
        go func(f string) {
            var it item
            it.thumbfile, it.err = thumbnail.ImageFile(f)
            ch <- it
        }(f)
    }

    for range filenames {
        it := <-ch
        if it.err != nil {
            return nil, it.err
        }
        thumbfiles = append(thumbfiles, it.thumbfile)
    }

    return thumbfiles, nil
}

Our final version of makeThumbnails, below, returns the total number of bytes occupied by the new files. Unlike the previous versions, however, it receives the file names not as a slice but over a channel of strings, so we cannot predict the number of loop iterations.

To know when the last goroutine has finished (which may not be the last one to start), we need to increment a counter before each goroutine starts and decrement it as each goroutine finishes. This demands a special kind of counter, one that can be safely manipulated from multiple goroutines and that provides a way to wait until it becomes zero. This counter type is known as sync.WaitGroup, and the code below shows how to use it:

// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup // number of working goroutines
    for f := range filenames {
        wg.Add(1)
        // worker
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb) // OK to ignore error
            sizes <- info.Size()
        }(f)
    }

    // closer
    go func() {
        wg.Wait()
        close(sizes)
    }()

    var total int64
    for size := range sizes {
        total += size
    }
    return total
}

Note the asymmetry in the Add and Done methods. Add, which increments the counter, must be called before the worker goroutine starts, not within it; otherwise we would not be sure that the Add happens before the “closer” goroutine calls Wait. Also, Add takes a parameter, but Done does not; it’s equivalent to Add(-1). We use defer to ensure that the counter is decremented even in the error case. The structure of the code above is a common and idiomatic pattern for looping in parallel when we don’t know the number of iterations.

The sizes channel carries each file size back to the main goroutine, which receives them using a range loop and computes the sum. Observe how we create a closer goroutine that waits for the workers to finish before closing the sizes channel. These two operations, wait and close, must be concurrent with the loop over sizes. Consider the alternatives: if the wait operation were placed in the main goroutine before the loop, it would never end, and if placed after the loop, it would be unreachable since with nothing closing the channel, the loop would never terminate.

The sequence of events in makeThumbnails6.

Figure 8.5. The sequence of events in makeThumbnails6.

Figure 8.5 illustrates the sequence of events in the makeThumbnails6 function. The vertical lines represent goroutines. The thin segments indicate sleep, the thick segments activity. The diagonal arrows indicate events that synchronize one goroutine with another. Time flows down. Notice how the main goroutine spends most of its time in the range loop asleep, waiting for a worker to send a value or the closer to close the channel.

Exercise 8.4: Modify the reverb2 server to use a sync.WaitGroup per connection to count the number of active echo goroutines. When it falls to zero, close the write half of the TCP connection as described in Exercise 8.3. Verify that your modified netcat3 client from that exercise waits for the final echoes of multiple concurrent shouts, even after the standard input has been closed.

Exercise 8.5: Take an existing CPU-bound sequential program, such as the Mandelbrot program of Section 3.3 or the 3-D surface computation of Section 3.2, and execute its main loop in parallel using channels for communication. How much faster does it run on a multiprocessor machine? What is the optimal number of goroutines to use?

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.216.186.164