Implementing channels

So far, we've dabbled in concurrent processes that are capable of doing a lot but not effectively communicating with each other. In other words, if you have two processes occupying the same processing time and sharing the same memory and data, you must have a way of knowing which process is in which place as part of a larger task.

Take, for example, an application that must loop through one paragraph of Lorem Ipsum and capitalize each letter, then write the result to a file. Of course, we will not really need a concurrent application to do this (and in fact, it's an endemic function of almost any language that handles strings), but it's a quick way to demonstrate the potential limitations of isolated goroutines. Shortly, we'll turn this primitive example into something more practical, but for now, here's the beginning of our capitalization example:

package main

import (
  "fmt"
  "runtime"
  "strings"
)
var loremIpsum string
var finalIpsum string
var letterSentChan chan string

func deliverToFinal(letter string, finalIpsum *string) {
  *finalIpsum += letter
}

func capitalize(current *int, length int, letters []byte, 
  finalIpsum *string) {
  for *current < length {
    thisLetter := strings.ToUpper(string(letters[*current]))

    deliverToFinal(thisLetter, finalIpsum)
    *current++
  }
}

func main() {

  runtime.GOMAXPROCS(2)

  index := new(int)
  *index = 0
  loremIpsum = "Lorem ipsum dolor sit amet, consectetur adipiscing 
  elit. Vestibulum venenatis magna eget libero tincidunt, ac 
  condimentum enim auctor. Integer mauris arcu, dignissim sit amet 
  convallis vitae, ornare vel odio. Phasellus in lectus risus. Ut 
  sodales vehicula ligula eu ultricies. Fusce vulputate fringilla 
  eros at congue. Nulla tempor neque enim, non malesuada arcu 
  laoreet quis. Aliquam eget magna metus. Vivamus lacinia 
  venenatis dolor, blandit faucibus mi iaculis quis. Vestibulum 
  sit amet feugiat ante, eu porta justo."

  letters := []byte(loremIpsum)
  length := len(letters)

  go capitalize(index, length, letters, &finalIpsum)
  go func() {
    go capitalize(index, length, letters, &finalIpsum)
  }()

  fmt.Println(length, " characters.")
  fmt.Println(loremIpsum)
  fmt.Println(*index)
  fmt.Println(finalIpsum)

}

If we run this with some degree of parallelism here but no communication between our goroutines, we'll end up with a jumbled mess of text, as shown in the following screenshot:

Implementing channels

Due to the demonstrated unpredictability of concurrent scheduling in Go, it may take many iterations to get this exact output. In fact, you may never get the exact output.

This won't do, obviously. So how do we best structure this application? The missing piece here is synchronization, but we could also do with a better design pattern.

Here's another way to break this problem down into pieces. Instead of having two processes handling the same thing in parallel, which is rife with risk, let's have one process that takes a letter from the loremIpsum string and capitalizes it, and then pass it onto another process to add it to our finalIpsum string.

You can envision this as two people sitting at two desks, each with a stack of letters. Person A is responsible to take a letter and capitalize it. He then passes the letter onto person B, who then adds it to the finalIpsum stack. To do this, we'll implement a channel in our code in an application tasked with taking text (in this case, the first line of Abraham Lincoln's Gettysburg address) and capitalizing each letter.

Channel-based sorting at the letter capitalization factory

Let's take the last example and do something (slightly) more purposeful by attempting to capitalize the preamble of Abraham Lincoln's Gettysburg address while mitigating the sometimes unpredictable effect of concurrency in Go, as shown in the following code:

package main

import(
  "fmt"
  "sync"
  "runtime"
  "strings"
)

var initialString string
var finalString string

var stringLength int

func addToFinalStack(letterChannel chan string, wg 
  *sync.WaitGroup) {
  letter := <-letterChannel
  finalString += letter
  wg.Done()
}


func capitalize(letterChannel chan string, currentLetter string, 
  wg *sync.WaitGroup) {

  thisLetter := strings.ToUpper(currentLetter)
  wg.Done()
  letterChannel <- thisLetter  
}


func main() {

  runtime.GOMAXPROCS(2)
  var wg sync.WaitGroup

  initialString = "Four score and seven years ago our fathers 
  brought forth on this continent, a new nation, conceived in 
  Liberty, and dedicated to the proposition that all men are 
  created equal."
  initialBytes := []byte(initialString)

  var letterChannel chan string = make(chan string)

  stringLength = len(initialBytes)



  for i := 0; i < stringLength; i++ {
    wg.Add(2)

    go capitalize(letterChannel, string(initialBytes[i]), &wg)
    go addToFinalStack(letterChannel, &wg)

    wg.Wait()
  }


  fmt.Println(finalString)

}

You'll note that we even bumped this up to a duo-core process and ended up with the following output:

go run alpha-channel.go
FOUR SCORE AND SEVEN YEARS AGO OUR FATHERS BROUGHT FORTH ON THIS 
  CONTINENT, A NEW NATION, CONCEIVED IN LIBERTY, AND DEDICATED TO THE 
  PROPOSITION THAT ALL MEN ARE CREATED EQUAL.

The output is just as we expected. It's worth reiterating that this example is overkill of the most extreme kind, but we'll parlay this functionality into a usable practical application shortly.

So what's happening here? First, we reimplemented the sync.WaitGroup struct to allow all of our concurrent code to execute while keeping the main thread alive, as shown in the following code snippet:

var wg sync.WaitGroup
...
for i := 0; i < stringLength; i++ {
  wg.Add(2)

  go capitalize(letterChannel, string(initialBytes[i]), &wg)
  go addToFinalStack(letterChannel, &wg)

  wg.Wait()
}

We allow each goroutine to tell the WaitGroup struct that we're done with the step. As we have two goroutines, we queue two Add() methods to the WaitGroup struct. Each goroutine is responsible to announce that it's done.

Next, we created our first channel. We instantiate a channel with the following line of code:

  var letterChannel chan string = make(chan string)

This tells Go that we have a channel that will send and receive a string to various procedures/goroutines. This is essentially the manager of all of the goroutines. It is also responsible to send and receive data to goroutines and manage the order of execution. As we mentioned earlier, the ability of channels to operate with internal context switching and without reliance on multithreading permits them to operate very quickly.

There is a built-in limit to this functionality. If you design non-concurrent or blocking code, you will effectively remove concurrency from goroutines. We will talk more about this shortly.

We run two separate goroutines through letterChannel: capitalize() and addToFinalStack(). The first one simply takes a single byte from a byte array constructed from our string and capitalizes it. It then returns the byte to the channel as shown in the following line of code:

letterChannel <- thisLetter

All communication across a channel happens in this fashion. The <- symbol syntactically tells us that data will be sent back to (or back through) a channel. It's never necessary to do anything with this data, but the most important thing to know is that a channel can be blocking, at least per thread, until it receives data back. You can test this by creating a channel and then doing absolutely nothing of value with it, as shown in the following code snippet:

package main

func doNothing()(string) {

  return "nothing"
}

func main() {
  
  var channel chan string = make(chan string)
  channel <- doNothing()

}

As nothing is sent along the channel and no goroutine is instantiated, this results in a deadlock. You can fix this easily by creating a goroutine and by bringing the channel into the global space by creating it outside of main().

Note

For the sake of clarity, our example here uses a local scope channel. Keeping these global whenever possible removes a lot of cruft, particularly if you have a lot of goroutines, as references to the channel can clutter up your code in a hurry.

For our example as a whole, you can look at it as is shown in the following figure:

Channel-based sorting at the letter capitalization factory

Cleaning up our goroutines

You may be wondering why we need a WaitGroup struct when using channels. After all, didn't we say that a channel gets blocked until it receives data? This is true, but it requires one other piece of syntax.

A nil or uninitialized channel will always get blocked. We will discuss the potential uses and pitfalls of this in Chapter 7, Performance and Scalability, and Chapter 10, Advanced Concurrency and Best Practices.

You have the ability to dictate how a channel blocks the application based on a second option to the make command by dictating the channel buffer.

Buffered or unbuffered channels

By default, channels are unbuffered, which means they will accept anything sent on them if there is a channel ready to receive. It also means that every channel call will block the execution of the application. By providing a buffer, the channel will only block the application when many returns have been sent.

A buffered channel is synchronous. To guarantee asynchronous performance, you'll want to experiment by providing a buffer length. We'll look at ways to ensure our execution falls as we expect in the next chapter.

Note

Go's channel system is based on Communicating Sequential Processes (CSP), a formal language to design concurrent patterns and multiprocessing. You will likely encounter CSP on its own when people describe goroutines and channels.

Using the select statement

One of the issues with first implementing channels is that whereas goroutines were formerly the method of simplistic and concurrent execution of code, we now have a single-purpose channel that dictates application logic across the goroutines. Sure, the channel is the traffic manager, but it never knows when traffic is coming, when it's no longer coming, and when to go home, unless being explicitly told. It waits passively for communication and can cause problems if it never receives any.

Go has a select control mechanism, which works just as effectively as a switch statement does, but on channel communication instead of variable values. A switch statement modifies execution based on the value of a variable, and select reacts to actions and communication across a channel. You can use this to orchestrate and arrange the control flow of your application. The following code snippet is our traditional switch, familiar to Go users and common among other languages:

switch {
  
  case 'x':

  case 'y':

}

The following code snippet represents the select statement:

select {
  
  case <- channelA:

  case <- channelB:

}

In a switch statement, the right-hand expression represents a value; in select, it represents a receive operation on a channel. A select statement will block the application until some information is sent along the channel. If nothing is sent ever, the application deadlocks and you'll get an error to that effect.

If two receive operations are sent at the same time (or if two cases are otherwise met), Go will evaluate them in an unpredictable fashion.

So, how might this be useful? Let's look at a modified version of the letter capitalization application's main function:

package main

import(
  "fmt"  
  "strings"
)

var initialString string
var initialBytes []byte
var stringLength int
var finalString string
var lettersProcessed int
var applicationStatus bool
var wg sync.WaitGroup

func getLetters(gQ chan string) {

  for i := range initialBytes {
    gQ <- string(initialBytes[i])  

  }

}

func capitalizeLetters(gQ chan string, sQ chan string) {

  for {
    if lettersProcessed >= stringLength {
      applicationStatus = false
      break
    }
    select {
      case letter := <- gQ:
        capitalLetter := strings.ToUpper(letter)
        finalString += capitalLetter
        lettersProcessed++
    }
  }
}

func main() {

  applicationStatus = true;

  getQueue := make(chan string)
  stackQueue := make(chan string)

  initialString = "Four score and seven years ago our fathers brought forth on this continent, a new nation, conceived in Liberty, and dedicated to the proposition that all men are created equal."
  initialBytes = []byte(initialString)
  stringLength = len(initialString)
  lettersProcessed = 0

  fmt.Println("Let's start capitalizing")


  go getLetters(getQueue)
  capitalizeLetters(getQueue,stackQueue)

  close(getQueue)
  close(stackQueue)

  for {

    if applicationStatus == false {
      fmt.Println("Done")
      fmt.Println(finalString)
      break
    }

  }
}

The primary difference here is we now have a channel that listens for data across two functions running concurrently, getLetters and capitalizeLetters. At the bottom, you'll see a for{} loop that keeps the main active until the applicationStatus variable is set to false. In the following code, we pass each of these bytes as a string through the Go channel:

func getLetters(gQ chan string) {

  for i := range initialBytes {
    gQ <- string(initialBytes[i])  

  }

}

The getLetters function is our primary goroutine that fetches individual letters from the byte array constructed from Lincoln's line. As the function iterates through each byte, it sends the letter through the getQueue channel.

On the receiving end, we have capitalizeLetters that takes each letter as it's sent across the channel, capitalizes it, and appends to our finalString variable. Let's take a look at this:

func capitalizeLetters(gQ chan string, sQ chan string) {

  for {
    if lettersProcessed >= stringLength {
      applicationStatus = false
      break
    }
    select {
      case letter := <- gQ:
        capitalLetter := strings.ToUpper(letter)
        finalString += capitalLetter
        lettersProcessed++
    }
  }
}

It's critical that all channels are closed at some point or our application will hit a deadlock. If we never break the for loop here, our channel will be left waiting to receive from a concurrent process, and the program will deadlock. We manually check to see that we've capitalized all letters and only then break the loop.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.229.44