Chapter 9. Concurrency Patterns - Barrier, Future, and Pipeline Design Patterns

Now that we are familiar with the concepts of concurrency and parallelism, and we have understood how to achieve them by using Go's concurrency primitives, we can see some patterns regarding concurrent work and parallel execution. In this chapter we'll see the following patterns:

  • Barrier is a very common pattern, especially when we have to wait for more than one response from different Goroutines before letting the program continue
  • Future pattern allows us to write an algorithm that will be executed eventually in time (or not) by the same Goroutine or a different one
  • Pipeline is a powerful pattern to build complex synchronous flows of Goroutines that are connected with each other according to some logic

Take a quick look at the description of the three patterns. They all describe some sort of logic to synchronize execution in time. It's very important to keep in mind that we are now developing concurrent structures with all the tools and patterns we have seen in the previous chapters. With Creational patterns we were dealing with creating objects. With the Structural patterns we were learning how to build idiomatic structures and in Behavioral patterns we were managing mostly with algorithms. Now, with Concurrency patterns, we will mostly manage the timing execution and order execution of applications that has more than one flow.

Barrier concurrency pattern

We are going to start with the Barrier pattern. Its purpose is simple--put up a barrier so that nobody passes until we have all the results we need, something quite common in concurrent applications.

Description

Imagine the situation where we have a microservices application where one service needs to compose its response by merging the responses of another three microservices. This is where the Barrier pattern can help us.

Our Barrier pattern could be a service that will block its response until it has been composed with the results returned by one or more different Goroutines (or services). And what kind of primitive do we have that has a blocking nature? Well, we can use a lock, but it's more idiomatic in Go to use an unbuffered channel.

Objectives

As its name implies, the Barrier pattern tries to stop an execution so it doesn't finish before it's ready to finish. The Barrier pattern's objectives are as follows:

  • Compose the value of a type with the data coming from one or more Goroutines.
  • Control the correctness of any of those incoming data pipes so that no inconsistent data is returned. We don't want a partially filled result because one of the pipes has returned an error.

An HTTP GET aggregator

For our example, we are going to write a very typical situation in a microservices application-an app that performs two HTTP GET calls and joins them in a single response that will be printed on the console.

Our small app must perform each request in a different Goroutine and print the result on the console if both responses are correct. If any of them returns an error, then we print just the error.

The design must be concurrent, allowing us to take advantage of our multicore CPUs to make the calls in parallel:

An HTTP GET aggregator

In the preceding diagram, the solid lines represent calls and the dashed lines represent channels. The balloons are Goroutines, so we have two Goroutines launched by the main function (which could also be considered a Goroutine). These two functions will communicate back to the main function by using a common channel that they received when they were created on the makeRequest calls.

Acceptance criteria

Our main objective in this app is to get a merged response of two different calls, so we can describe our acceptance criteria like this:

  • Print on the console the merged result of the two calls to http://httpbin.org/headers and http://httpbin.org/User-Agent URLs. These are a couple of public endpoints that respond with data from the incoming connections. They are very popular for testing purposes. You will need an internet connection to do this exercise.
  • If any of the calls fails, it must not print any result-just the error message (or error messages if both calls failed).
  • The output must be printed as a composed result when both calls have finished. It means that we cannot print the result of one call and then the other.

Unit test - integration

To write unit or integration tests for concurrent designs can sometimes be tricky, but this won't stop us from writing our awesome unit tests. We will have a single barrier method that accepts a set of endpoints defined as a string type. The barrier will make a GET request to each endpoint and compose the result before printing it out. In this case, we will write three integration tests to simplify our code so we don't need to generate mock responses:

package barrier 
 
import ( 
    "bytes" 
    "io" 
    "os" 
    "strings" 
    "testing" 
) 
 
func TestBarrier(t *testing.T) { 
  t.Run("Correct endpoints", func(t *testing.T) { 
    endpoints := []string{"http://httpbin.org/headers",  "http://httpbin.org/User-Agent"
    } 
  }) 
 
  t.Run("One endpoint incorrect", func(t *testing.T) { 
    endpoints := []string{"http://malformed-url",  "http://httpbin.org/User-Agent"} 
  }) 
 
  t.Run("Very short timeout", func(t *testing.T) { 
    endpoints := []string{"http://httpbin.org/headers",  "http://httpbin.org/User-Agent"} 
  }) 
} 

We have a single test that will execute three subtests:

  • The first test makes two calls to the correct endpoints
  • The second test will have an incorrect endpoint, so it must return an error
  • The last test will return the maximum timeout time so that we can force a timeout error

We will have a function called barrier that will accept an undetermined number of endpoints in the form of strings. Its signature could be like this:

func barrier(endpoints ...string) {} 

As you can see, the barrier function doesn't return any value because its result will be printed on the console. Previously, we have written an implementation of an io.Writer interface to emulate the writing on the operating system's stdout library. Just to change things a bit, we will capture the stdout library instead of emulating one. The process to capture the stdout library isn't difficult once you understand concurrency primitives in Go:

func captureBarrierOutput(endpoints ...string) string { 
    reader, writer, _ := os.Pipe() 
 
    os.Stdout = writer 
    out := make(chan string) 
    
    go func() { 
      var buf bytes.Buffer 
      io.Copy(&buf, reader) 
      out <- buf.String() 
    }() 
 
    barrier(endpoints...) 
 
    writer.Close() 
    temp := <-out 
 
    return temp 
} 

Don't feel daunted by this code; it's really simple. First we created a pipe; we have done this before in Chapter 3, Structural Patterns - Adapter, Bridge, and Composite Design Patterns, when we talked about the Adapter design pattern. To recall, a pipe allows us to connect an io.Writer interface to an io.Reader interface so that the reader input is the Writer output. We define the os.Stdout as the writer. Then, to capture stdout output, we will need a different Goroutine that listens while we write to the console. As you know, if we write, we don't capture, and if we capture, we are not writing. The keyword here is while; it is a good rule of thumb that if you find this word in some definition, you'll probably need a concurrent structure. So we use the go keyword to launch a different Goroutine that copies reader input to a bytes buffer before sending the contents of the buffer through a channel (that we should have previously created).

At this point, we have a listening Goroutine, but we haven't printed anything yet, so we call our (not yet written) function barrier with the provided endpoints. Next, we have to close the writer to signal the Goroutine that no more input is going to come to it. Our channel called out blocks execution until some value is received (the one sent by our launched Goroutine). The last step is to return the contents captured from the console.

OK, so we have a function called captureBarrierOutput that will capture the outputs in stdout and return them as a string. We can write our tests now:

t.Run("Correct endpoints", func(t *testing.T) { 
    endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent"
    } 
 
    result := captureBarrierOutput(endpoints...)
    
    if !strings.Contains(result, "Accept-Encoding") || strings.Contains (result, "User-Agent") 
  {
      
    t.Fail()
    
  }
    
  t.Log(result) 
}) 

All the tests are very easy to implement. All in all, it is the captureBarrierOutput function that calls the barrier function. So we pass the endpoints and check the returned result. Our composed response directed to http://httpbin.org must contain the text Accept-Encoding and User-Agent in the responses of each endpoint. If we don't find those texts, the test will fail. For debugging purposes, we log the response in case we want to check it with the -v flag on the go test:

t.Run("One endpoint incorrect", func(t *testing.T) { 
  endpoints := []string
  {
    "http://malformed-url", "http://httpbin.org/User-Agent"} 
 
    result := captureBarrierOutput(endpoints...)
    
    if !strings.Contains(result, "ERROR") {
        
        t.Fail()
    
    }
    
    t.Log(result) 
}) 

This time we used an incorrect endpoint URL, so the response must return the error prefixed with the word ERROR that we will write ourselves in the barrier function.

The last function will reduce the timeout of the HTTP GET client to a minimum of 1 ms, so we force a timeout:

t.Run("Very short timeout", func(t *testing.T) { 
  endpoints := []string
  {
    "http://httpbin.org/headers", "http://httpbin.org/User-Agent"} 
    timeoutMilliseconds = 1
    
    result := captureBarrierOutput(endpoints...)
    
    if !strings.Contains(result, "Timeout") {
        
      t.Fail()
    
    }
    
    t.Log(result) 
  }) 

The timeoutMilliseconds variable will be a package variable that we will have to define later during implementation.

Implementation

We needed to define a package variable called timeoutMilliseconds. Let's start from there:

package barrier 
 
import ( 
    "fmt" 
    "io/ioutil" 
    "net/http" 
    "time" 
) 
 
var timeoutMilliseconds int = 5000 

The initial timeout delay is 5 seconds (5,000 milliseconds) and we will need those packages in our code.

OK, so we need a function that launches a Goroutine for each endpoint URL. Do you remember how we achieve the communication between Goroutines? Exactly--channels! So we will need a channel to handle responses and a channel to handle errors.

But we can simplify it a bit more. We will receive two correct responses, two errors, or a response and an error; in any case, there are always two responses, so we can join errors and responses in a merged type:

type barrierResp struct { 
    Err  error 
    Resp string 
} 

So, each Goroutine will send back a value of the barrierResp type. This value will have a value for Err or a value for the Resp field.

The procedure is simple: we create a channel of size 2, the one that will receive responses of the barrierResp type, we launch both requests and wait for two responses, and then check to see if there is any error:

func barrier(endpoints ...string) { 
    requestNumber := len(endpoints) 
 
    in := make(chan barrierResp, requestNumber) 
    defer close(in) 
 
    responses := make([]barrierResp, requestNumber) 
 
    for _, endpoint := range endpoints { 
        go makeRequest(in, endpoint) 
    } 
 
    var hasError bool 
    for i := 0; i < requestNumber; i++ { 
        resp := <-in 
        if resp.Err != nil { 
            fmt.Println("ERROR: ", resp.Err) 
            hasError = true 
        } 
        responses[i] = resp 
    } 
 
    if !hasError { 
        for _, resp := range responses { 
            fmt.Println(resp.Resp) 
        } 
    } 
} 

Following the previous description, we created a buffered channel called in, making it the size of the incoming endpoints, and we deferred channel closing. Then, we launched a function called makeRequest with each endpoint and the response channel.

Now we will loop  twice, once for each endpoint. In the loop, we block the execution waiting for data from the in channel. If we find an error, we print it prefixed with the word ERROR as we expect in our tests, and set hasErrorvar to true. After two responses, if we don't find any error (hasError== false) we print every response and the channel will be closed.

We still lack the makeRequest function:

func makeRequest(out chan<- barrierResp, url string) { 
    res := barrierResp{} 
    client := http.Client{ 
        Timeout: time.Duration(time.Duration(timeoutMilliseconds) * time.Millisecond), 
    } 
 
    resp, err := client.Get(url) 
    if err != nil { 
        res.Err = err 
        out <- res 
        return 
    } 
 
    byt, err := ioutil.ReadAll(resp.Body) 
    if err != nil { 
        res.Err = err 
        out <- res 
        return 
    } 
 
    res.Resp = string(byt) 
    out <- res 
} 

The makeRequest function is a very straightforward functions that accepts a channel to output barrierResp values to and a URL to request. We create an http.Client and set its timeout field to the value of the timeoutMilliseconds package variable. This is how we can change the timeout delay before the in function tests. Then, we simply make the GET call, take the response, parse it to a byte slice, and send it through the out channel.

We do all this by filling a variable called res of the barrierResp type. If we find an error while performing a GET request or parsing the body of the result, we fill the res.Err field, send it to the out channel (which has the opposite side connected to the original Goroutine), and exit the function (so we don't send two values through the out channel by mistake).

Time to run the tests. Remember that you need an Internet connection, or the first two tests will fail. We will first try the test that has two endpoints that are correct:

go test -run=TestBarrier/Correct_endpoints -v .
=== RUN   TestBarrier
=== RUN   TestBarrier/Correct_endpoints
--- PASS: TestBarrier (0.54s)
    --- PASS: TestBarrier/Correct_endpoints (0.54s)
        barrier_test.go:20: {
          "headers": {
            "Accept-Encoding": "gzip", 
"Host": "httpbin.org",
"User-Agent": "Go-http-client/1.1"
             }
         }
         {
              "User-Agent": "Go-http-client/1.1"
          }

    ok

Perfect. We have a JSON response with a key, headers, and another JSON response with a key User-Agent. In our integration tests, we were looking for the strings, User-Agent and Accept-Encoding, which are present, so the test has passed successfully.

Now we will run the test that has an incorrect endpoint:

go test -run=TestBarrier/One_endpoint_incorrect -v .
=== RUN   TestBarrier
=== RUN   TestBarrier/One_endpoint_incorrect
--- PASS: TestBarrier (0.27s)
    --- PASS: TestBarrier/One_endpoint_incorrect (0.27s)
        barrier_test.go:31: ERROR:  Get http://malformed-url: dial tcp: lookup malformed-url: no such host
ok

We can see that we have had an error where http://malformed-url has returned a no such host error. A request to this URL must return a text with the word ERROR: prefixed, as we stated during the acceptance criteria, that's why this test is correct (we don't have a false positive).

Note

In testing, it's very important to understand the concepts of "false positive" and "false negative" tests. A false positive test is roughly described as a test that passes a condition when it shouldn't (result: all passed) while the false negative is just the reverse (result: test failed). For example, we could be testing that a string is returned when doing the requests but, the returned string could be completely empty! This will lead to a false negative, a test that doesn't fail even when we are checking a behavior that is incorrect on purpose (a request to http://malformed-url).

The last test reduced the timeout time to 1 ms:

go test -run=TestBarrier/Very_short_timeout -v .     
=== RUN   TestBarrier 
=== RUN   TestBarrier/Very_short_timeout 
--- PASS: TestBarrier (0.00s) 
    --- PASS: TestBarrier/Very_short_timeout (0.00s) 
        barrier_test.go:43: ERROR:  Get http://httpbin.org/User-Agent: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers) 
        ERROR:  Get http://httpbin.org/headers: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers) 
                 
ok

Again, the test passed successfully and we have got two timeout errors. The URLs were correct, but we didn't have a response in less than one millisecond, so the client has returned a timeout error.

Waiting for responses with the Barrier design pattern

The Barrier pattern opens the door of microservices programming with its composable nature. It could be considered a Structural pattern, as you can imagine.

The Barrier pattern is not only useful to make network requests; we could also use it to split some task into multiple Goroutines. For example, an expensive operation could be split into a few smaller operations distributed in different Goroutines to maximize parallelism and achieve better performance.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.207.206