Building workers

Earlier in this book, we talked about concurrency patterns and a bit about workers. We even brought the workers concept into play in the previous chapter, when we were building our logging systems.

Truly speaking, "worker" is a fairly generic and ambiguous concept, not just in Go, but in general programming and development. In some languages, it's an object/instantiated class, and in others it's a concurrent actor. In functional programming languages, worker is a graduated function return passed to another.

If we go back to the preface, we will see that we have literally used the Go gopher as an example of a worker. In short, a worker is something more complex than a single function call or programmatic action that will perform a task one or more times.

So why are we talking about it now? When we build our channels, we are creating a mechanism to do work. When we have a struct or an interface, we're combining methods and values at a single place, and then doing work using that object as both a mechanism for the work as well as a place to store information about that work.

This is particularly useful in application design, as we're able to delegate various elements of an application's functionality to distinct and well-defined workers. Consider, for example, a server pinging application that has specific pieces doing specific things in a self-contained, compartmentalized manner.

We'll attempt to check for server availability via the HTTP package, check the status code and errors, and back off if we find problems with any particular server. You can probably see where this is going—this is the most basic approach to load balancing. But an important design consideration is the way in which we manage our channels.

We'll have a master channel, where all important global transactions should be accumulated and evaluated, but each individual server will also have its own channels for handling tasks that are important only to that individual struct.

The design in the following code can be considered as a rudimentary pipeline, which is roughly akin to the producer/consumer model we talked about in the previous chapters:

package main

import
(
  "fmt"
  "time"
  "net/http"
)

const INIT_DELAY = 3000
const MAX_DELAY = 60000
const MAX_RETRIES = 4
const DELAY_INCREMENT = 5000

The preceding code gives the configuration part of the application, setting scope on how frequently to check servers, the maximum amount of time for backing off, and the maximum amount of retries before giving up entirely.

The DELAY_INCREMENT value represents how much time we will add to our server checking process each time we discover a problem. Let's take a look at how to create a server in the following section:

var Servers []Server

type Server struct {
  Name string
  URI string
  LastChecked time.Time
  Status bool
  StatusCode int
  Delay int
  Retries int
  Channel chan bool
}

Now, we design the basic server (using the following code), which contains its current status, the last time it was checked, the present delay between checks, its own channel for evaluating statuses and establishing the new status, and updated retry delay:

func (s *Server) checkServerStatus(sc chan *Server) {
  var previousStatus string

    if s.Status == true {
      previousStatus = "OK"
    }else {
      previousStatus = "down"
    }

    fmt.Println("Checking Server",s.Name)
    fmt.Println("	Server was",previousStatus,"on last check at",s.LastChecked)

    response, err := http.Get(s.URI)
    if err != nil {
      fmt.Println("	Error: ",err)
      s.Status = false
      s.StatusCode = 0
    }else {
      fmt.Println(response.Status)
      s.StatusCode = response.StatusCode
      s.Status = true
    }

    s.LastChecked = time.Now()
    sc <- s
}

The checkServerStatus() method is the meat and potatoes of our application here. We pass all of our servers through this method in the main() function to our cycleServers() loop, after which it becomes self-fulfilling.

If our Status is set to true, we send the state to the console as OK (otherwise down) and set our Server status code with s.StatusCode as either the HTTP code or 0 if there was a network or other error.

Finally, set the last-checked time of Server to Now() and pass Server through the serverChan channel. In the following code, we'll demonstrate how we'll rotate through our available servers:

func cycleServers(sc chan *Server) {

  for i := 0; i < len(Servers); i++ {
    Servers[i].Channel = make(chan bool)
    go Servers[i].updateDelay(sc)
    go Servers[i].checkServerStatus(sc)
  }

}

This is our initial loop, called from main. It simply loops through our available servers and initializes its listening goroutine as well as sending the first checkServerStatus request.

It's worth noting two things here: first, the channel invoked by Server will never actually die, but instead the application will stop checking the server. That's fine for all practical purposes here, but if we have thousands and thousands of servers to check, we're wasting resources on what essentially amounts to an unclosed channel and a map element that has not been removed. Later, we'll broach the concept of manually killing goroutines, something we've only been able to do through abstraction by stopping the communication channel. Let's now take a look at the following code that controls a server's status and its next steps:

func (s *Server) updateDelay(sc chan *Server) {
  for {
    select {
      case msg := <- s.Channel:

        if msg == false {
          s.Delay = s.Delay + DELAY_INCREMENT
          s.Retries++
          if s.Delay > MAX_DELAY {
            s.Delay = MAX_DELAY
          }

        }else {
          s.Delay = INIT_DELAY
        }
        newDuration := time.Duration(s.Delay)

        if s.Retries <= MAX_RETRIES {
          fmt.Println("	Will check server again")
          time.Sleep(newDuration * time.Millisecond)
          s.checkServerStatus(sc)
        }else {
          fmt.Println("	Server not reachable after",MAX_RETRIES,"retries")
        }

      default:
    }
  }
}

This is where each Server will listen for changes in its status, as reported by checkServerStatus(). When any given Server struct receives a message that a change in status has been reported via our initial loop, it will evaluate that message and act accordingly.

If the Status is set to false, we know that the server was inaccessible for some reason. The Server reference itself will then add a delay to the next time it's checked. If it's set to true, the server was accessible and the delay will either be set or reset to the default retry value of INIT_DELAY.

It finally sets a sleep mode on that goroutine before reinitializing the checkServerStatus() method on itself, passing the serverChan reference along in the initial goroutine loop in the main() function:

func main() {

  endChan := make(chan bool)
  serverChan := make(chan *Server)

Servers = []Server{ {Name: "Google", URI: "http://www.google.com", Status: true, Delay: INIT_DELAY}, {Name: "Yahoo", URI: "http://www.yahoo.com", Status: true, Delay: INIT_DELAY}, {Name: "Bad Amazon", URI: "http://amazon.zom", Status: true, Delay: INIT_DELAY} }

One quick note here—in our slice of Servers, we intentionally introduced a typo in the last element. You'll notice amazon.zom, which will provoke an HTTP error in the checkServerStatus() method. The following is the function to cycle through servers to find an appropriate match:

  go cycleServers(serverChan)

  for {
    select {
      case currentServer := <- serverChan:
        currentServer.Channel <- false
      default:

    }
  }

  <- endChan
  
}

The following is an example of the output with the typo included:

Checking Server Google
        Server was OK on last check at 0001-01-01 00:00:00 +0000 UTC
        200 OK
        Will check server again
Checking Server Yahoo
        Server was OK on last check at 0001-01-01 00:00:00 +0000 UTC
        200 OK
        Will check server again
Checking Server Amazon
        Server was OK on last check at 0001-01-01 00:00:00 +0000 UTC
        Error:  Get http://amazon.zom: dial tcp: GetAddrInfoW: No such host is known.
        Will check server again
Checking Server Google
        Server was OK on last check at 2014-04-23 12:49:45.6575639 -0400 EDT

We'll be taking the preceding code for one last spin through some concurrency patterns later in this chapter, turning it into something a bit more practical.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.251.206