Chapter 5. Locks, Blocks, and Better Channels

Now that we're starting to get a good grasp of utilizing goroutines in safe and consistent ways, it's time to look a bit more at what causes code blocking and deadlocks. Let's also explore the sync package and dive into some profiling and analysis.

So far, we've built some relatively basic goroutines and complementary channels, but we now need to utilize some more complex communication channels between our goroutines. To do this, we'll implement more custom data types and apply them directly to channels.

We've not yet looked at some of Go's lower-level tools for synchronization and analysis, so we'll explore sync.atomic, a package that—along with sync.Mutex—allows for more granular control over state.

Finally, we'll delve into pprof, a fabulous tool provided by Go that lets us analyze our binaries for detailed information about our goroutines, threads, overall heap, and blocking profiles.

Armed with some new tools and methods to test and analyze our code, we'll be ready to generate a robust, highly-scalable web server that can be used to safely and quickly handle any amount of traffic thrown at it.

Understanding blocking methods in Go

So far, we've encountered a few pieces of blocking code, intentional and unintentional, through our exploration and examples. At this point, it's prudent to look at the various ways we can introduce (or inadvertently fall victim to) blocking code.

By looking at the various ways Go code can be blocked, we can also be better prepared to debug cases when concurrency is not operating as expected in our application.

Blocking method 1 – a listening, waiting channel

The most concurrently-focused way to block your code is by leaving a serial channel listening to one or more goroutines. We've seen this a few times by now, but the basic concept is shown in the following code snippet:

func thinkAboutKeys() {
  for {
    fmt.Println("Still Thinking")
    time.Sleep(1 * time.Second)
  }
}

func main() {
  fmt.Println("Where did I leave my keys?")

  blockChannel := make(chan int)
  go thinkAboutKeys()

  <-blockChannel

  fmt.Println("OK I found them!")
}

Despite the fact that all of our looping code is concurrent, we're waiting on a signal for our blockChannel to continue linear execution. We can, of course, see this in action by sending along the channel, thus continuing code execution as shown in the following code snippet:

func thinkAboutKeys(bC chan int) {
  i := 0
  max := 10
  for {
    if i >= max {
      bC <- 1
    }
    fmt.Println("Still Thinking")
    time.Sleep(1 * time.Second)
    i++
  }
}

Here, we've modified our goroutine function to accept our blocking channel and deliver an end message to it when we've hit our maximum. These kinds of mechanisms are important for long-running processes because we may need to know when and how to kill them.

Sending more data types via channels

Go's use of channels (structs and functions) as first-class citizens provides us with a lot of interesting ways of executing, or at least trying, new approaches of communication between channels.

One such example is to create a channel that handles translation through a function itself, and instead of communicating directly through the standard syntax, the channel executes its function. You can even do this on a slice/array of functions iterating through them in the individual functions.

Creating a function channel

So far, we've almost exclusively worked in single data type and single value channels. So, let's try sending a function across a channel. With first-class channels, we need no abstraction to do this; we can just send almost anything directly over a channel as shown in the following code snippet:

func abstractListener(fxChan chan func() string ) {

  fxChan <- func() string {

    return "Sent!"
  }
}


func main() {

  fxChan := make (chan func() string)
  defer close(fxChan)
  go abstractListener(fxChan)
  select {
    case rfx := <- fxChan:
    msg := rfx()
    fmt.Println(msg)      
    fmt.Println("Received!")

  }

}

This is like a callback function. However, it also is intrinsically different, as it is not just the method called after the execution of a function, but also serves as the mode of communication between functions.

Keep in mind that there are often alternatives to passing functions across channels, so this will likely be something very specific to a use case rather than a general practice.

Since your channel's type can be virtually any available type, this functionality opens up a world of possibilities, which can be potentially confusing abstractions. A struct or interface as a channel type is pretty self-explanatory, as you can make application-related decisions on any of its defined properties.

Let's see an example of using an interface in this way in the next section.

Using an interface channel

As with our function channel, being able to pass an interface (which is a complementary data type) across a channel can be incredibly useful. Let's look at an example of sending across an interface:

type Messenger interface {
  Relay() string
}

type Message struct {
  status string
}

func (m Message) Relay() string {
  return m.status
}

func alertMessages(v chan Messenger, i int) {
  m := new(Message)
  m.status = "Done with " + strconv.FormatInt(int64(i),10)
  v <- m
}

func main () {
  
  msg := make(chan Messenger)

  for i:= 0; i < 10; i++ {
    go alertMessages(msg,i)
  }

  select {
    case message := <-msg:
      fmt.Println (message.Relay())
  }
  <- msg
}

This is a very basic example of how to utilize interfaces as channels; in the previous example, the interface itself is largely ornamental. In actuality, we're passing newly-created message types through the interface's channel rather than interacting directly with the interface.

Using structs, interfaces, and more complex channels

Creating a custom type for our channel allows us to dictate the way our intra-channel communication will work while still letting Go dictate the context switching and behind-the-scenes scheduling.

Ultimately, this is mostly a design consideration. In the previous examples, we used individual channels for specific pieces of communication in lieu of a one-size-fits-all channel that passes a multitude of data. However, you may also find it advantageous to use a single channel to handle a large amount of communication between goroutines and other channels.

The primary consideration in deciding whether to segregate channels into individual bits of communication or a package of communications depends on the aggregate mutability of each.

For example, if you'll always want to send a counter along with a function or string and they will always be paired in terms of data consistency, such a method might make sense. If any of those components can lose synchronicity en route, it's more logical to keep each piece independent.

Note

Maps in Go

As mentioned, maps in Go are like hash tables elsewhere and immediately related to slices or arrays.

In the previous example we were checking to see if a username/key exists already; for this purpose Go provides a simple method for doing so. When attempting to retrieve a hash with a nonexistent key, a zero value is returned, as shown in the following lines of code:

if Users[user.name] {
  fmt.Fprintln(conn, "Unfortunately, that username is in use!");
}

This makes it syntactically simple and clean to test against a map and its keys.

One of the best features of maps in Go is the ability to make keys out of any comparable type, which includes strings, integers, Booleans as well as any map, struct, slice, or channel that is comprised exclusively of those types.

This one-to-many channel can work as a master-slave or broadcaster-subscriber model. We'll have a channel that listens for messages and routes them to appropriate users and a channel that listens for broadcast messages and queues them to all users.

To best demonstrate this, we'll create a simple multiuser chat system that allows Twitter style @user communication with a single user, with the ability to broadcast standard messages to all users and creates a universal broadcast chat note that can be read by all users. Both will be simple, custom type struct channels, so we can delineate various communication pieces.

Note

Structs in Go

As a first-class, anonymous, and extensible type, a struct is one of the most versatile and useful data constructs available. It's simple to create analogs to other data structures such as databases and data stores, and while we hesitate to call them objects they can certainly be viewed as such.

The rule of thumb as it pertains to using structs within functions is to pass by reference rather than by value if the struct is particularly complex. Two points of clarification are as follows:

  • Reference is in quotations because (and this is validated by Go's FAQ) technically everything in Go is passed by value. By that we mean that though a reference to a pointer still exists, at some step in the process the value(s) is copied.
  • "Particularly complex" is, understandably, tough to quantify, so personal judgment might come into play. However, we can consider a simple struct one with no more than five methods or properties.

You can think of this in terms of a help desk system, and while in the present day we'd be unlikely to create a command-line interface for such a thing, eschewing the web portion allows us to gloss over all of the client-side code that isn't necessarily relevant to Go.

You could certainly take such an example and extrapolate it to the Web utilizing some frontend libraries for asynchronous functionality (such as backbone.js or socket.io).

To accomplish this, we'll need to create both a client and a server application, and we'll try to keep each as bare bone as possible. You can clearly and simply augment this to include any functionality you see fit such as making Git comments and updating a website.

We'll start with the server, which will be the most complicated part. The client application will mostly receive messages back through the socket, so much of the reading and routing logic will be invisible to the client-side of the process.

The net package – a chat server with interfaced channels

Here, we'll need to introduce a relevant package that will be required to handle most of the communication for our application(s). We've touched on the net package a bit while dabbling in the SVG output generation example to show concurrency—net/http is just a small part of a broader, more complex, and more feature-full package.

The basic components that we'll be using will be a TCP listener (server) and a TCP dialer (client). Let's look at the basic setup for these.

Server

Listening on a TCP port couldn't be easier. Simply initiate the net.Listen() method and handle the error as shown in the following lines of code:

  listener, err := net.Listen("tcp", ":9000")
  if err != nil {
    fmt.Println ("Could not start server!")
  }

If you get an error starting the server, check your firewall or modify the port—it's possible that something is utilizing port 9000 on your system.

As easy as that is, it's just as simple on our client/dialer side.

Client

In this case, we have everything running on localhost as shown in the following lines of code. However, in a real-world application we'd probably have an intranet address used here:

  conn, err := net.Dial("tcp","127.0.0.1:9000")
  if err != nil {
    fmt.Println("Could not connect to server!")
  }

In this application, we demonstrate two different ways to handle byte buffers of unknown lengths on Read(). The first is a rather crude method of trimming a string using strings.TrimRight(). This method allows you to define characters you aren't interested in counting as part of the input as shown in the following line of code. Mostly, it's whitespace characters that we can assume are unused parts of the buffer length.

sendMessage := []byte(cM.name + ": " + 
  strings.TrimRight(string(buf)," 	
"))

Dealing with strings this way is often both inelegant and unreliable. What happens if we get something we don't expect here? The string will be the length of the buffer, which in this case is 140 bytes.

The other way we deal with this is by using the end of the buffer directly. In this case, we assign the n variable to the conn.Read() function, and then can use that as a buffer length in the string to buffer conversion as shown in the following lines of code:

messBuff := make([]byte,1024)
n, err := conn.Read(messBuff)
if err != nil {

}
message := string(messBuff[:n])

Here we're taking the first n bytes of the message buffer's received value.

This is more reliable and efficient, but you will certainly run into text ingestion cases where you will want to remove certain characters to create cleaner input.

Each connection in this application is a struct and each user is as well. We keep track of our users by pushing them to the Users slice as they join.

The selected username is a command-line argument as follows:

./chat-client nathan
chat-client.exe nathan

We do not check to to ensure there is only one user with that name, so that logic might be required, particularly if chats with direct messages contain sensitive information.

Handling direct messages

For the most part, this chat client is a simple echo server, but as mentioned, we also include an ability to do non-globally broadcast messages by invoking the Twitter style @ syntax.

We handle this mainly through regular expressions, wherein if a message matches @user then only that user will see the message; otherwise, it's broadcasted to all. This is somewhat inelegant, because senders of the direct message will not see their own direct message if their usernames do not match the intended names of the users.

To do this, we direct every message through a evalMessageRecipient() function before broadcasting. As this is relying on user input to create the regular expression (in the form of the username), please take note that we should escape this with the regexp.QuoteMeta() method to prevent regex failures.

Let's first examine our chat server, which is responsible for maintaining all connections and passing them to goroutines to listen and receive, as shown in the following code:

chat-server.go
package main

import
(
  "fmt"
  "strings"
  "net"
  "strconv"
  "regexp"
)

var connectionCount int
var messagePool chan(string)

const (
  INPUT_BUFFER_LENGTH = 140
)

We utilize a maximum character buffer. This restricts our chat messages to no more than 140 characters. Let's look at our User struct to see the information we might keep about a user that joins, as follows:

type User struct {
  Name string
  ID int
  Initiated bool

The initiated variable tells us that User is connected after a connection and announcement. Let's examine the following code to understand the way we'd listen on a channel for a logged-in user:

  UChannel chan []byte
  Connection *net.Conn
}
The User struct contains all of the information we will maintain 
  for each connection. Keep in mind here we don't do any sanity 
  checking to make sure a user doesn't exist – this doesn't 
  necessarily pose a problem in an example, but a real chat client 
  would benefit from a response should a user name already be 
  in use.

func (u *User) Listen() {
  fmt.Println("Listening for",u.Name)
  for {
    select {
      case msg := <- u.UChannel:
        fmt.Println("Sending new message to",u.Name)
        fmt.Fprintln(*u.Connection,string(msg))

    }
  }
}

This is the core of our server: each User gets its own Listen() method, which maintains the User struct's channel and sends and receives messages across it. Put simply, each user gets a concurrent channel of his or her own. Let's take a look at the ConnectionManager struct and the Initiate() function that creates our server in the following code:

type ConnectionManager struct {
  name      string
  initiated bool
}

func Initiate() *ConnectionManager {
  cM := &ConnectionManager{
    name:      "Chat Server 1.0",
    initiated: false,
  }

  return cM
}

Our ConnectionManager struct is initiated just once. This sets some relatively ornamental attributes, some of which could be returned on request or on chat login. We'll examine the evalMessageRecipient function that attempts to roughly identify the intended recipient of any message sent as follows:

func evalMessageRecipient(msg []byte, uName string) bool {
  eval := true
  expression := "@"
  re, err := regexp.MatchString(expression, string(msg))
  if err != nil {
    fmt.Println("Error:", err)
  }
  if re == true {
    eval = false
    pmExpression := "@" + uName
    pmRe, pmErr := regexp.MatchString(pmExpression, string(msg))
    if pmErr != nil {
      fmt.Println("Regex error", err)
    }
    if pmRe == true {
      eval = true
    }
  }
  return eval
}

This is our router of sorts taking the @ part of the string and using it to detect an intended recipient to hide from public consumption. We do not return an error if the user doesn't exist or has left the chat.

Note

The format for regular expressions using the regexp package relies on the re2 syntax, which is described at https://code.google.com/p/re2/wiki/Syntax.

Let's take a look at the code for the Listen() method of the ConnectionManager struct:

func (cM *ConnectionManager) Listen(listener net.Listener) {
  fmt.Println(cM.name, "Started")
  for {

    conn, err := listener.Accept()
    if err != nil {
      fmt.Println("Connection error", err)
    }
    connectionCount++
    fmt.Println(conn.RemoteAddr(), "connected")
    user := User{Name: "anonymous", ID: 0, Initiated: false}
    Users = append(Users, &user)
    for _, u := range Users {
      fmt.Println("User online", u.Name)
    }
    fmt.Println(connectionCount, "connections active")
    go cM.messageReady(conn, &user)
  }
}

func (cM *ConnectionManager) messageReady(conn net.Conn, user 
  *User) {
  uChan := make(chan []byte)

  for {

    buf := make([]byte, INPUT_BUFFER_LENGTH)
    n, err := conn.Read(buf)
    if err != nil {
      conn.Close()
      conn = nil
    }
    if n == 0 {
      conn.Close()
      conn = nil
    }
    fmt.Println(n, "character message from user", user.Name)
    if user.Initiated == false {
      fmt.Println("New User is", string(buf))
      user.Initiated = true
      user.UChannel = uChan
      user.Name = string(buf[:n])
      user.Connection = &conn
      go user.Listen()

      minusYouCount := strconv.FormatInt(int64(connectionCount-1), 
        10)
      conn.Write([]byte("Welcome to the chat, " + user.Name + ", 
        there are " + minusYouCount + " other users"))

    } else {

      sendMessage := []byte(user.Name + ": " + 
        strings.TrimRight(string(buf), " 	
"))

      for _, u := range Users {
        if evalMessageRecipient(sendMessage, u.Name) == true {
          u.UChannel <- sendMessage
        }

      }

    }

  }
}geReady (per connectionManager) function instantiates new 
  connections into a User struct, utilizing first sent message as 
  the user's name.

var Users []*User
This is our unbuffered array (or slice) of user structs.
func main() {
  connectionCount = 0
  serverClosed := make(chan bool)

  listener, err := net.Listen("tcp", ":9000")
  if err != nil {
    fmt.Println ("Could not start server!",err)
  }

  connManage := Initiate()  
  go connManage.Listen(listener)

  <-serverClosed
}

As expected, main() primarily handles the connection and error and keeps our server open and nonblocked with the serverClosed channel.

There are a number of methods we could employ to improve the way we route messages. The first method would be to invoke a map (or hash table) bound to a username. If the map's key exists, we could return some error functionality if a user already exists, as shown in the following code snippet:

type User struct {
  name string
}
var Users map[string] *User

func main() {
  Users := make(map[string] *User)
}

Examining our client

Our client application is a bit simpler primarily because we don't care as much about blocking code.

While we do have two concurrent operations (wait for the message and wait for user input to send the message), this is significantly less complicated than our server, which needs to concurrently listen to each created user and distribute sent messages, respectively.

Let's now compare our chat client to our chat server. Obviously, the client has less overall maintenance of connections and users, and so we do not need to use nearly as many channels. Let's take a look at our chat client's code:

chat-client.go
package main

import
(
  "fmt"
  "net"
  "os"
  "bufio"
  "strings"
)
type Message struct {
  message string
  user string
}

var recvBuffer [140]byte

func listen(conn net.Conn) {
  for {

      messBuff := make([]byte,1024)
      n, err := conn.Read(messBuff)
      if err != nil {
        fmt.Println("Read error",err)
      }
      message := string(messBuff[:n])
      message = message[0:]

      fmt.Println(strings.TrimSpace(message))
      fmt.Print("> ")
  }

}

func talk(conn net.Conn, mS chan Message) {

      for {
      command := bufio.NewReader(os.Stdin)
        fmt.Print("> ")        
                line, err := command.ReadString('
')
                
                line = strings.TrimRight(line, " 	
")
        _, err = conn.Write([]byte(line))                       
                if err != nil {
                        conn.Close()
                        break

                }
      doNothing(command)  
        }  

}

func doNothing(bf *bufio.Reader) {
  // A temporary placeholder to address io reader usage


}
func main() {

  messageServer := make(chan Message)

  userName := os.Args[1]

  fmt.Println("Connecting to host as",userName)

  clientClosed := make(chan bool)

  conn, err := net.Dial("tcp","127.0.0.1:9000")
  if err != nil {
    fmt.Println("Could not connect to server!")
  }
  conn.Write([]byte(userName))
  introBuff := make([]byte,1024)    
  n, err := conn.Read(introBuff)
  if err != nil {

  }
  message := string(introBuff[:n])  
  fmt.Println(message)

  go talk(conn,messageServer)
  go listen(conn)

  <- clientClosed
}

Blocking method 2 – the select statement in a loop

Have you noticed yet that the select statement itself blocks? Fundamentally, the select statement is not different from an open listening channel; it's just wrapped in conditional code.

The <- myChannel channel operates the same way as the following code snippet:

select {
  case mc := <- myChannel:
    // do something
}

An open listening channel is not a deadlock as long as there are no goroutines sleeping. You'll find this on channels that are listening but will never receive anything, which is another method of basically waiting.

These are useful shortcuts for long-running applications you wish to keep alive but you may not necessarily need to send anything along that channel.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.67.16