Worker pools

Generally speaking, a Worker pool is a set of threads that are about to process jobs assigned to them. More or less, the Apache web server works that way: the main process accepts all incoming requests that are forwarded to the worker processes for getting served. Once a worker process has finished its job, it is ready to serve a new client. Nevertheless, there is a central difference here because our worker pool will use goroutines instead of threads. Additionally, threads do not usually die after serving a request because the cost of ending a thread and creating a new one is too high, whereas goroutines do die after finishing their job.

As you will see shortly, worker pools in Go are implemented with the help of buffered channels, because they allow you to limit the number of goroutines running at the same time.

The next program, workerPool.go, will be presented in five parts. The program will implement a simple task: it will process integer numbers and print their square values using a single goroutine for serving each request. Despite the deceptive simplicity of workerPool.go, the Go code of the program can be easily used as a template for implementing much more difficult tasks.

This is an advanced technique that can help you create server processes in Go that can accept and serve multiple clients using goroutines!

The first part of workerPool.go follows next:

package main 
 
import ( 
    "fmt" 
    "os" 
    "strconv" 
    "sync" 
    "time" 
) 
 
type Client struct { 
    id      int 
    integer int 
} 
 
type Data struct { 
    job    Client 
    square int 
} 

Here you can see a technique that uses the Client structure for assigning a unique id to each request that will process. The Data structure is used for grouping the data of a Client with the actual results generated by the program. Put simply, the Client structure holds the input data of each request, whereas the Data structure holds the results of a request.

The second code portion of workerPool.go is shown in the following Go code:

var ( 
    size    = 10 
    clients = make(chan Client, size) 
    data    = make(chan Data, size) 
) 
 
func worker(w *sync.WaitGroup) { 
    for c := range clients { 
        square := c.integer * c.integer 
        output := Data{c, square} 
        data <- output 
        time.Sleep(time.Second) 
    } 
    w.Done() 
} 

The preceding code has two interesting parts. The first part creates three global variables. The clients and data buffered channels are used for getting new client requests and writing the results, respectively. If you want your program to run faster, you can increase the value of the size parameter.

The second part is the implementation of the worker() function, which reads the clients channel in order to get new requests to serve. Once the processing is complete, the result is written to the data channel. The delay that is introduced using the time.Sleep(time.Second) statement is not necessary, but it gives you a better sense of the way the generated output will be printed.

The third part of workerPool.go contains the following Go code:

func makeWP(n int) { 
    var w sync.WaitGroup 
    for i := 0; i < n; i++ { 
         w.Add(1) 
         go worker(&w) 
    } 
    w.Wait() 
    close(data) 
} 
 
func create(n int) { 
    for i := 0; i < n; i++ { 
        c := Client{i, i} 
        clients <- c 
    } 
    close(clients) 
} 

The preceding code implements two functions, named makeWP() and create(). The purpose of the makeWP() function is to generate the required number of worker() goroutines for processing all requests. Although the w.Add(1) function is called in makeWP(), the w.Done() is called in the worker() function once a worker has finished its job.

The purpose of the create() function is to create all requests properly using the Client type and then to write them to the clients channel for processing. Note that the clients channel is read by the worker() function.

The fourth code segment of workerPool.go is as follows:

func main() { 
    fmt.Println("Capacity of clients:", cap(clients)) 
    fmt.Println("Capacity of data:", cap(data)) 
 
    if len(os.Args) != 3 { 
        fmt.Println("Need #jobs and #workers!") 
        os.Exit(1) 
    } 
 
    nJobs, err := strconv.Atoi(os.Args[1]) 
    if err != nil { 
        fmt.Println(err) 
        return 
    } 
 
    nWorkers, err := strconv.Atoi(os.Args[2]) 
    if err != nil { 
        fmt.Println(err) 
        return 
   } 

In the preceding code, you read your command-line parameters. First, however, you see that you can use the cap() function to find the capacity of a channel.

If the number of workers is larger than the size of the clients buffered channel, then the number of goroutines that is going to be created will be equal to the size of the clients channel. Similarly, if the number of jobs is larger than the number of workers, the jobs will be served in smaller sets.

The program allows you to define the number of workers and the number of jobs using its command-line arguments. However, in order to change the size of the clients and data channels, you will need to make changes to the source code of the program.

The remaining code of workerPool.go follows next:

go create(nJobs) 
finished := make(chan interface{}) 
go func() { 
    for d := range data { 
        fmt.Printf("Client ID: %d	int: ", d.job.id) 
        fmt.Printf("%dtsquare: %d
", d.job.integer, d.square) 
    } 
         finished <- true 
    }() 
 
    makeWP(nWorkers) 
    fmt.Printf(": %v
", <-finished) 
} 

First, you will call the create() function for mimicking the client requests that you will have to process. An anonymous goroutine is used for reading the data channel and printing the output to the screen. The finished channel is used for blocking the program until the anonymous goroutine is done reading the data channel. Therefore, the finished channel needs no particular type! Finally, you will call the makeWP() function for actually processing the requests. The <-finished statement in fmt.Printf() blocks, which means that it does not allow the program to end until somebody writes something to the finished channel. That somebody is the anonymous goroutine of the main() function. Additionally, although the anonymous function writes the value true to the finished channel, you can write false to it and have the same result, which is unblocking the main() function. Try it on your own!

Executing workerPool.go will generate the following output:

$ go run workerPool.go 15 5
Capacity of clients: 10
Capacity of data: 10
Client ID: 0      int: 0      square: 0
Client ID: 4      int: 4      square: 16
Client ID: 1      int: 1      square: 1
Client ID: 3      int: 3      square: 9
Client ID: 2      int: 2      square: 4
Client ID: 5      int: 5      square: 25
Client ID: 6      int: 6      square: 36
Client ID: 7      int: 7      square: 49
Client ID: 8      int: 8      square: 64
Client ID: 9      int: 9      square: 81
Client ID: 10     int: 10     square: 100
Client ID: 11     int: 11     square: 121
Client ID: 12     int: 12     square: 144
Client ID: 13     int: 13     square: 169
Client ID: 14     int: 14     square: 196
: true

When you want to serve each individual request without expecting an answer from it in the main() function, as happened with workerPool.go, you have fewer things to worry about. A simple way both to use goroutines for processing your requests and to get an answer from them in the main() function is using shared memory or a monitor process that will collect the data instead of just printing it on the screen.

Finally, the work of the workerPool.go program is much simpler because the worker() function cannot fail. This will not be the case when you have to work over computer networks or with other kinds of resources that can fail.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.66.156