Workers

The previous example is a Google Maps client that uses a time.Ticker channel to limit the rate of the requests. The rate limit makes sense for an API key. Let's imagine that we have more API keys from different accounts, so we could potentially execute more requests.

A very typical concurrent approach is the workers pool. Here, you have a series of clients that can be picked up to process an input and different parts of the application can ask to use such clients, returning the clients back when they are done.

We can create more than one client that shares the same channels for both requests and responses, with requests being the coordinates and the results being the response from the service. Since the channel for responses is unique, we can define a custom type that holds all the information needed for that channel:

type result struct {
Loc [2]float64
Result []maps.Result
Error error
}

The next step is creating the channels—we are going to read a comma-separated list of values from an environment variable here. We will create a channel for requests, and one for responses. Both channels have a capacity equal to the number of workers, in this case, but this would work even if the channels were unbuffered. Since we are just using channels, we will need another channel, done, which signals whether a worker has finished working on their last job:

keys := strings.Split(os.Getenv("MAPS_APIKEYS"), ",")
requests := make(chan [2]float64, len(keys))
results := make(chan result, len(keys))
done := make(chan struct{})

Now, we will create a goroutine for each of the keys, in which we define a client that feeds on the requests channel, executes the request, and sends the result to the dedicated channel. When the requests channel is closed, the goroutine will exit the range and send a message to the done channel, which is shown in the following code:

for i := range keys {
go func(id int) {
log.Printf("Starting worker %d with API key %q", id, keys[id])
client := maps.NewClient(maps.DailyCap, keys[id])
for j := range requests {
var r = result{Loc: j}
log.Printf("w[%d] working on %v", id, j)
r.Result, r.Error = client.ReverseGeocode(j[0], j[1])
results <- r
}
done <- struct{}{}
}(i)
}

The locations can be sent to the request channel sequentially in another goroutine:

go func() {
for _, l := range [][2]float64{
{40.4216448, -3.6904040},
{40.4163111, -3.7047328},
{40.4123388, -3.7096724},
{40.4145150, -3.7064412},
} {
requests <- l
}
close(requests)
}()

We can keep count of the done signals we are receiving and close the results channel when all the workers are done:

go func() {
count := 0
for range done {
if count++; count == len(keys) {
break
}
}
close(results)
}()

The channel is used to count how many workers are done, and once every one of them is done, it will close the result channel. This will allow us to just loop over it to get the result:

for r := range results {
log.Printf("received %v", r)
}

Using a channel is just one of the ways to wait for all the goroutines to finish, and we will see more idiomatic ways of doing it in the next chapter with the sync package.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.66.156