Now that we are familiar with the concepts of concurrency and parallelism, and we have understood how to achieve them by using Go's concurrency primitives, we can see some patterns regarding concurrent work and parallel execution. In this chapter we'll see the following patterns:
Take a quick look at the description of the three patterns. They all describe some sort of logic to synchronize execution in time. It's very important to keep in mind that we are now developing concurrent structures with all the tools and patterns we have seen in the previous chapters. With Creational patterns we were dealing with creating objects. With the Structural patterns we were learning how to build idiomatic structures and in Behavioral patterns we were managing mostly with algorithms. Now, with Concurrency patterns, we will mostly manage the timing execution and order execution of applications that has more than one flow.
We are going to start with the Barrier pattern. Its purpose is simple--put up a barrier so that nobody passes until we have all the results we need, something quite common in concurrent applications.
Imagine the situation where we have a microservices application where one service needs to compose its response by merging the responses of another three microservices. This is where the Barrier pattern can help us.
Our Barrier pattern could be a service that will block its response until it has been composed with the results returned by one or more different Goroutines (or services). And what kind of primitive do we have that has a blocking nature? Well, we can use a lock, but it's more idiomatic in Go to use an unbuffered channel.
As its name implies, the Barrier pattern tries to stop an execution so it doesn't finish before it's ready to finish. The Barrier pattern's objectives are as follows:
For our example, we are going to write a very typical situation in a microservices application-an app that performs two HTTP GET
calls and joins them in a single response that will be printed on the console.
Our small app must perform each request in a different Goroutine and print the result on the console if both responses are correct. If any of them returns an error, then we print just the error.
The design must be concurrent, allowing us to take advantage of our multicore CPUs to make the calls in parallel:
In the preceding diagram, the solid lines represent calls and the dashed lines represent channels. The balloons are Goroutines, so we have two Goroutines launched by the main
function (which could also be considered a Goroutine). These two functions will communicate back to the main
function by using a common channel that they received when they were created on the makeRequest
calls.
Our main objective in this app is to get a merged response of two different calls, so we can describe our acceptance criteria like this:
http://httpbin.org/headers
and http://httpbin.org/User-Agent
URLs. These are a couple of public endpoints that respond with data from the incoming connections. They are very popular for testing purposes. You will need an internet connection to do this exercise.To write unit or integration tests for concurrent designs can sometimes be tricky, but this won't stop us from writing our awesome unit tests. We will have a single barrier
method that accepts a set of endpoints defined as a string
type. The barrier will make a GET
request to each endpoint and compose the result before printing it out. In this case, we will write three integration tests to simplify our code so we don't need to generate mock responses:
package barrier import ( "bytes" "io" "os" "strings" "testing" ) func TestBarrier(t *testing.T) { t.Run("Correct endpoints", func(t *testing.T) { endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent" } }) t.Run("One endpoint incorrect", func(t *testing.T) { endpoints := []string{"http://malformed-url", "http://httpbin.org/User-Agent"} }) t.Run("Very short timeout", func(t *testing.T) { endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent"} }) }
We have a single test that will execute three subtests:
We will have a function called barrier
that will accept an undetermined number of endpoints in the form of strings. Its signature could be like this:
func barrier(endpoints ...string) {}
As you can see, the barrier
function doesn't return any value because its result will be printed on the console. Previously, we have written an implementation of an io.Writer
interface to emulate the writing on the operating system's stdout
library. Just to change things a bit, we will capture the stdout
library instead of emulating one. The process to capture the stdout
library isn't difficult once you understand concurrency primitives in Go:
func captureBarrierOutput(endpoints ...string) string { reader, writer, _ := os.Pipe() os.Stdout = writer out := make(chan string) go func() { var buf bytes.Buffer io.Copy(&buf, reader) out <- buf.String() }() barrier(endpoints...) writer.Close() temp := <-out return temp }
Don't feel daunted by this code; it's really simple. First we created a pipe; we have done this before in Chapter 3, Structural Patterns - Adapter, Bridge, and Composite Design Patterns, when we talked about the Adapter design pattern. To recall, a pipe allows us to connect an io.Writer
interface to an io.Reader
interface so that the reader input is the Writer
output. We define the os.Stdout
as the writer. Then, to capture stdout
output, we will need a different Goroutine that listens while we write to the console. As you know, if we write, we don't capture, and if we capture, we are not writing. The keyword here is while
; it is a good rule of thumb that if you find this word in some definition, you'll probably need a concurrent structure. So we use the go
keyword to launch a different Goroutine that copies reader input to a bytes buffer before sending the contents of the buffer through a channel (that we should have previously created).
At this point, we have a listening Goroutine, but we haven't printed anything yet, so we call our (not yet written) function barrier
with the provided endpoints. Next, we have to close the writer to signal the Goroutine that no more input is going to come to it. Our channel called out blocks execution until some value is received (the one sent by our launched Goroutine). The last step is to return the contents captured from the console.
OK, so we have a function called captureBarrierOutput
that will capture the outputs in stdout
and return them as a string. We can write our tests now:
t.Run("Correct endpoints", func(t *testing.T) { endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent" } result := captureBarrierOutput(endpoints...) if !strings.Contains(result, "Accept-Encoding") || strings.Contains (result, "User-Agent") { t.Fail() } t.Log(result) })
All the tests are very easy to implement. All in all, it is the captureBarrierOutput
function that calls the barrier
function. So we pass the endpoints and check the returned result. Our composed response directed to http://httpbin.org
must contain the text Accept-Encoding and User-Agent in the responses of each endpoint. If we don't find those texts, the test will fail. For debugging purposes, we log the response in case we want to check it with the -v
flag on the go test:
t.Run("One endpoint incorrect", func(t *testing.T) { endpoints := []string { "http://malformed-url", "http://httpbin.org/User-Agent"} result := captureBarrierOutput(endpoints...) if !strings.Contains(result, "ERROR") { t.Fail() } t.Log(result) })
This time we used an incorrect endpoint URL, so the response must return the error prefixed with the word ERROR that we will write ourselves in the barrier
function.
The last function will reduce the timeout of the HTTP GET
client to a minimum of 1 ms, so we force a timeout:
t.Run("Very short timeout", func(t *testing.T) { endpoints := []string { "http://httpbin.org/headers", "http://httpbin.org/User-Agent"} timeoutMilliseconds = 1 result := captureBarrierOutput(endpoints...) if !strings.Contains(result, "Timeout") { t.Fail() } t.Log(result) })
The timeoutMilliseconds
variable will be a package variable that we will have to define later during implementation.
We needed to define a package variable called timeoutMilliseconds
. Let's start from there:
package barrier import ( "fmt" "io/ioutil" "net/http" "time" ) var timeoutMilliseconds int = 5000
The initial timeout delay is 5 seconds (5,000 milliseconds) and we will need those packages in our code.
OK, so we need a function that launches a Goroutine for each endpoint URL. Do you remember how we achieve the communication between Goroutines? Exactly--channels! So we will need a channel to handle responses and a channel to handle errors.
But we can simplify it a bit more. We will receive two correct responses, two errors, or a response and an error; in any case, there are always two responses, so we can join errors and responses in a merged type:
type barrierResp struct { Err error Resp string }
So, each Goroutine will send back a value of the barrierResp
type. This value will have a value for Err
or a value for the Resp
field.
The procedure is simple: we create a channel of size 2, the one that will receive responses of the barrierResp
type, we launch both requests and wait for two responses, and then check to see if there is any error:
func barrier(endpoints ...string) { requestNumber := len(endpoints) in := make(chan barrierResp, requestNumber) defer close(in) responses := make([]barrierResp, requestNumber) for _, endpoint := range endpoints { go makeRequest(in, endpoint) } var hasError bool for i := 0; i < requestNumber; i++ { resp := <-in if resp.Err != nil { fmt.Println("ERROR: ", resp.Err) hasError = true } responses[i] = resp } if !hasError { for _, resp := range responses { fmt.Println(resp.Resp) } } }
Following the previous description, we created a buffered channel called in
, making it the size of the incoming endpoints, and we deferred channel closing. Then, we launched a function called makeRequest
with each endpoint and the response channel.
Now we will loop twice, once for each endpoint. In the loop, we block the execution waiting for data from the in
channel. If we find an error, we print it prefixed with the word ERROR as we expect in our tests, and set hasErrorvar
to true. After two responses, if we don't find any error (hasError== false
) we print every response and the channel will be closed.
We still lack the makeRequest
function:
func makeRequest(out chan<- barrierResp, url string) { res := barrierResp{} client := http.Client{ Timeout: time.Duration(time.Duration(timeoutMilliseconds) * time.Millisecond), } resp, err := client.Get(url) if err != nil { res.Err = err out <- res return } byt, err := ioutil.ReadAll(resp.Body) if err != nil { res.Err = err out <- res return } res.Resp = string(byt) out <- res }
The makeRequest
function is a very straightforward functions that accepts a channel to output barrierResp
values to and a URL to request. We create an http.Client
and set its timeout field to the value of the timeoutMilliseconds
package variable. This is how we can change the timeout delay before the in
function tests. Then, we simply make the GET
call, take the response, parse it to a byte slice, and send it through the out
channel.
We do all this by filling a variable called res
of the barrierResp
type. If we find an error while performing a GET
request or parsing the body of the result, we fill the res.Err
field, send it to the out
channel (which has the opposite side connected to the original Goroutine), and exit the function (so we don't send two values through the out
channel by mistake).
Time to run the tests. Remember that you need an Internet connection, or the first two tests will fail. We will first try the test that has two endpoints that are correct:
go test -run=TestBarrier/Correct_endpoints -v . === RUN TestBarrier === RUN TestBarrier/Correct_endpoints --- PASS: TestBarrier (0.54s) --- PASS: TestBarrier/Correct_endpoints (0.54s) barrier_test.go:20: { "headers": { "Accept-Encoding": "gzip", "Host": "httpbin.org", "User-Agent": "Go-http-client/1.1" } } { "User-Agent": "Go-http-client/1.1" } ok
Perfect. We have a JSON response with a key, headers
, and another JSON response with a key User-Agent
. In our integration tests, we were looking for the strings, User-Agent
and Accept-Encoding
, which are present, so the test has passed successfully.
Now we will run the test that has an incorrect endpoint:
go test -run=TestBarrier/One_endpoint_incorrect -v . === RUN TestBarrier === RUN TestBarrier/One_endpoint_incorrect --- PASS: TestBarrier (0.27s) --- PASS: TestBarrier/One_endpoint_incorrect (0.27s) barrier_test.go:31: ERROR: Get http://malformed-url: dial tcp: lookup malformed-url: no such host ok
We can see that we have had an error where http://malformed-url
has returned a no such host error. A request to this URL must return a text with the word ERROR:
prefixed, as we stated during the acceptance criteria, that's why this test is correct (we don't have a false positive).
In testing, it's very important to understand the concepts of "false positive" and "false negative" tests. A false positive test is roughly described as a test that passes a condition when it shouldn't (result: all passed) while the false negative is just the reverse (result: test failed). For example, we could be testing that a string is returned when doing the requests but, the returned string could be completely empty! This will lead to a false negative, a test that doesn't fail even when we are checking a behavior that is incorrect on purpose (a request to http://malformed-url
).
The last test reduced the timeout time to 1 ms:
go test -run=TestBarrier/Very_short_timeout -v .
=== RUN TestBarrier
=== RUN TestBarrier/Very_short_timeout
--- PASS: TestBarrier (0.00s)
--- PASS: TestBarrier/Very_short_timeout (0.00s)
barrier_test.go:43: ERROR: Get http://httpbin.org/User-Agent: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
ERROR: Get http://httpbin.org/headers: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
ok
Again, the test passed successfully and we have got two timeout errors. The URLs were correct, but we didn't have a response in less than one millisecond, so the client has returned a timeout error.
The Barrier pattern opens the door of microservices programming with its composable nature. It could be considered a Structural pattern, as you can imagine.
The Barrier pattern is not only useful to make network requests; we could also use it to split some task into multiple Goroutines. For example, an expensive operation could be split into a few smaller operations distributed in different Goroutines to maximize parallelism and achieve better performance.
3.16.207.206