Chapter 10. Concurrency Patterns - Workers Pool and Publish/Subscriber Design Patterns

We have reached the final chapter of the book, where we will discuss a couple of patterns with concurrent structures. We will explain every step in detail so you can follow the examples carefully.

The idea is to learn about patterns to design concurrent applications in idiomatic Go. We are using channels and Goroutines heavily, instead of locks or sharing variables.

  • We will look at one way to develop a pool of workers. This is useful to control the number of Goroutines in an execution.
  • The second example is a rewrite of the Observer pattern, which we saw on Chapter 7, Behavioral Patterns - Visitor, State, Mediator, and Observer Design Patterns, written with a concurrent structure. With this example we'll dig a bit more into the concurrent structures and look at how they can differ from a common approach.

Workers pool

One problem we may face with some of the previous approaches to concurrency is their unbounded context. We cannot let an app create  an unlimited amount of Goroutines. Goroutines are light, but the work they perform could be very heavy. A workers pool helps us to solve this problem.

Description

With a pool of workers, we want to bound the amount of Goroutines available so that we have a deeper control of the pool of resources. This is easy to achieve by creating a channel for each worker and having workers with either an idle or busy status. The task can seem daunting, but it's not at all.

Objectives

Creating a Worker pool is all about resource control: CPU, RAM, time, connections, and so on. The workers pool design pattern helps us to do the following:

  • Control access to shared resources using quotas
  • Create a limited amount of Goroutines per app
  • Provide more parallelism capabilities to other concurrent structures

A pool of pipelines

In the previous chapter, we saw how to work with a pipeline. Now we will launch a bounded number of them so that the Go scheduler can try to process requests in parallel. The idea here is to control the number of Goroutines, stop them gracefully when the app has finished, and maximize parallelism using a concurrent structure without race conditions.

The pipeline we will use is similar to the one we used in the previous chapter, where we were generating numbers, raising them to the power of 2, and summing the final results. In this case, we are going to pass strings to which we will append and prefix data.

Acceptance criteria

In business terms, we want something that tells us that, worker has processed a request, a predefined ending, and incoming data parsed to uppercase:

  1. When making a request with a string value (any), it must be uppercase.
  2. Once the string is uppercase, a predefined text must be appended to it. This text should not be uppercase.
  3. With the previous result, the worker ID must be prefixed to the final string.
  4. The resulting string must be passed to a predefined handler.

We haven't talked about how to do it technically, just what the business wants. With the entire description, we'll at least have workers, requests, and handlers.

Implementation

The very beginning is a request type. According to the description, it must hold the string that will enter the pipeline as well as the handler function:

   // workers_pipeline.go file 
    type Request struct { 
          Data    interface{} 
          Handler RequestHandler 
    } 

Where is the string? We have a Data field of type interface{} so we can use it to pass a string. By using an interface, we can reuse this type for a string, an int, or a struct data type. The receiver is the one who must know how to deal with the incoming interface.

The Handler field has the type Request handler, which we haven't defined yet:

type RequestHandler func(interface{}) 

A request handler is any function that accepts an interface as its first argument, and returns nothing. Again, we see the interface{}, where we would usually see a string. This is one of the receivers we mentioned previously, which we'll need to cast the incoming result.

So, when sending a request, we must fill it with some value in the Data field and implement a handler; for example:

func NewStringRequest(s string, id int, wg *sync.WaitGroup) Request { 
    myRequest := Request{ 
        Data: "Hello", Handler: func(i interface{})
        { 
            defer wg.Done() 
            s, ok := i.(string) 
                if !ok{ 
                    log.Fatal("Invalid casting to string") 
                 } 
             fmt.Println(s) 
         } 
    } 
} 

The handler is defined by using a closure. We again check the type if the interface (and we defer the call to the Done() method at the end). In case of an improper interface, we simply print its contents and return. If the casting is OK, we also print them, but here is where we will usually do something with the result of the operation; we have to use type casting to retrieve the contents of the interface{} (which is a string). This must be done in every step in the pipeline, although it will introduce a bit of overhead.

Now we need a type that can handle Request types. Possible implementations are virtually infinite, so it is better to define an interface first:

   // worker.go file 
    type WorkerLauncher interface { 
        LaunchWorker(in chan Request) 
    } 

The WorkerLauncher interface must implement only the LaunchWorker(chan Request) method. Any type that implements this interface will have to receive a channel of Request type to satisfy it. This channel of the Request type is the single entrance point to the pipeline.

The dispatcher

Now, to launch workers in parallel and handle all the possible incoming channels, we'll need something like a dispatcher:

   // dispatcher.go file 
    type Dispatcher interface { 
        LaunchWorker(w WorkerLauncher) 
        MakeRequest(Request) 
        Stop() 
    } 

A Dispatcher interface can launch an injected WorkerLaunchers type in its own LaunchWorker method. The Dispatcher interface must use the LaunchWorker method of any of the WorkerLauncher types to initialize a pipeline. This way we can reuse the Dispatcher interface to launch many types of WorkerLaunchers.

When using MakeRequest(Request), the Dispatcher interface exposes a nice method to inject a new Request into the workers pool.

Finally, the user must call stop when all Goroutines must be finished. We must handle graceful shutdown in our apps, and we want to avoid Goroutine leaks.

We have enough interfaces, so let's start with the dispatcher which is a bit less complicated:

    type dispatcher struct { 
        inCh chan Request 
    } 

Our dispatcher structure stores a channel of Request type in one of its fields. This is going to be the single point of entrance for requests in any pipeline. We said that it must implement three methods, as follows:

    func (d *dispatcher) LaunchWorker(id int, w WorkerLauncher) { 
        w.LaunchWorker(d.inCh) 
    } 
 
    func (d *dispatcher) Stop(){ 
        close(d.inCh) 
    } 
 
    func (d *dispatcher) MakeRequest(r Request) { 
        d.inCh <- r 
    } 

In this example, the Dispatcher interface doesn't need to do anything special to itself before launching a worker, so the LaunchWorker method on the Dispatcher simply executes the LaunchWorker method of the incoming WorkerLauncher,which also has a LaunchWorker method to initiate itself. We have previously defined that a WorkerLauncher type needs at least an ID and a channel for incoming requests, so that's what we are passing through.

It may seem unnecessary to implement the LaunchWorker method in the Dispatcher interface. In different scenarios, it could be interesting to save running worker IDs in the dispatcher to control which ones are up or down; the idea is to hide launching implementation details. In this case, the Dispatcher interface is merely acting as a Facade design pattern hiding some implementation details from the user.

The second method is Stop. It closes the incoming requests channel, provoking a chain reaction. We saw in the pipeline example that, when closing the incoming channel, each for-range loop within the Goroutines breaks and the Goroutine is also finished. In this case, when closing a shared channel, it will provoke the same reaction, but in every listening Goroutine, so all pipelines will be stopped. Cool, huh?

Request implementation is very simple; we just pass the request in the argument to the channel of incoming requests. The Goroutine will block there forever until the opposite end of the channel retrieves the request. Forever? That seems like a lot if something happens. We can introduce a timeout, as follows:

    func (d *dispatcher) MakeRequest(r Request) { 
        select { 
        case d.inCh <- r: 
        case <-time.After(time.Second * 5): 
            return 
        } 
    } 

If you remember from previous chapters, we can use select to control which operation is performed over a channel. Like a switch case, just one operation can be executed. In this case, we have two different operations: sending and receiving.

The first case is a sending operation--try to send this, and it will block there until someone takes the value in the opposite side of the channel. Not a huge improvement, then. The second case is a receiving operation; it will be triggered after 5 seconds if the upper request can't be sent successfully, and the function will return. It would be very convenient to return an error here, but to make things simple, we will leave it empty

Finally, in the dispatcher, for convenience, we will define a Dispatcher creator:

    func NewDispatcher(b int) Dispatcher { 
        return &dispatcher{ 
            inCh:make(chan Request, b), 
        } 
    } 

By using this function instead of creating the dispatcher manually, we can simply avoid small mistakes, such as forgetting to initialize the channel field. As you can see, the b argument refers to the buffer size in the channel.

The pipeline

So, our dispatcher is done and we need to develop the pipeline described in the acceptance criteria. First, we need a type to implement the WorkerLauncher type:

   // worker.go file 
    type PreffixSuffixWorker struct { 
        id int 
        prefixS string 
        suffixS string 
    } 
 
    func (w *PreffixSuffixWorker) LaunchWorker(i int, in chan Request) {} 

The PreffixSuffixWorker variable stores an ID, a string to prefix, and another string to suffix the incoming data of the Request type. So, the values to prefix and append will be static in these fields, and we will take them from there.

We will implement the LaunchWorker method later and begin with each step in the pipeline. According to first acceptance criteria, the incoming string must be uppercase. So, the uppercase method will be the first step in our pipeline:

    func (w *PreffixSuffixWorker) uppercase(in <-chan Request) <-chan Request { 
        out := make(chan Request) 
 
        go func() { 
            for msg := range in { 
                s, ok := msg.Data.(string) 
              
                if !ok { 
                    msg.handler(nil) 
                    continue 
                } 
              
                msg.Data = strings.ToUpper(s) 
              
                out <- msg 
            } 
 
            close(out) 
        }() 
 
        return out 
    } 

Good. As in the previous chapter, a step in the pipeline accepts a channel of incoming data and returns a channel of the same type. It has a very similar approach to the examples we developed in the previous chapter. This time, though, we aren't using package functions, and uppercase is part of the PreffixSuffixWorker type and the incoming data is a struct instead of an int.

The msg variable is a Request type and it will have a handler function and data in the form of an interface. The Data field should be a string, so we type cast it before using it. When type casting a value, we will receive the same value with the requested type and a true or false flag (represented by the ok variable). If the ok variable is false, the cast could not be done and we won't throw the value down the pipeline. We stop this Request here by sending a nil to the handler (which will also provoke a type-casting error).

Once we have a nice string in the s variable, we can uppercase it and store it again in the Data field to send down the pipeline to the next step. Be aware that the value will be sent as an interface again, so the next step will need to cast it again. This is the downside of using this approach.

With the first step done, let's continue with the second. According to the second acceptance criteria now, a predefined text must be appended. This text is the one stored in the suffixS field:

func (w *PreffixSuffixWorker) append(in <-chan Request) <-chan Request { 
    out := make(chan Request) 
    go func() { 
        for msg := range in { 
        uppercaseString, ok := msg.Data.(string) 
                
        if !ok { 
            msg.handler(nil) 
            continue 
            } 
        msg.Data = fmt.Sprintf("%s%s", uppercaseString, w.suffixS) 
        out <- msg 
        } 
        close(out) 
    }() 
    return out 
} 

The append function has the same structure as the uppercase function. It receives and returns a channel of incoming requests, and launches a new Goroutine that iterates over the incoming channel until it is closed. We need to type cast the incoming value, as mentioned previously.

In this step in the pipeline the incoming string is uppercase (after doing a type assertion). To append any text to it, we just need to use the fmt.Sprintf() function, as we have done many times before, which formats a new string with the provided data. In this case, we pass the value of the suffixS field as the second value, to append it to the end of the string.

Just the last step in the pipeline is missing, the prefix operation:

    func (w *PreffixSuffixWorker) prefix(in <-chan Request) { 
        go func() { 
            for msg := range in { 
                uppercasedStringWithSuffix, ok := msg.Data.(string) 
              
                if !ok { 
                    msg.handler(nil) 
                    continue 
                } 
 
                msg.handler(fmt.Sprintf("%s%s", w.prefixS, uppercasedStringWithSuffix)) 
            } 
        }() 
    } 

What's calling your attention in this function? Yes, it doesn't return any channel now. We could have done this entire pipeline in two ways. I suppose you have realized that we have used a Future handler function to execute with the final result in the pipeline. A second approach would be to pass a channel to return the data back to its origin. In some cases, a Future would be enough, while in others it could be more convenient to pass a channel so that it can be connected to a different pipeline (for example).

In any case, the structure of a step in a pipeline must be very familiar to you already. We cast the value, check the result of the casting, and send nil to the handler if anything went wrong. But, in case everything was OK, the last thing to do is to format the text again to place the prefixS field at the beginning of the text, to send the resulting string back to the origin by calling the request's handler.

Now, with our worker almost finished, we can implement the LaunchWorker method:

    func (w *PreffixSuffixWorker) LaunchWorker(in chan Request) { 
        w.prefix(w.append(w.uppercase(in))) 
    } 

That's all for workers! We simply pass the returning channels to the next steps in the Pipeline, as we did in the previous chapter. Remember that the pipeline is executed from inside to outside of the calls. So, what's the order of execution of any incoming data to the pipeline?

  1. The data enters the pipeline through the Goroutine launched in the uppercase method.
  2. Then, it goes to the Goroutine launched in append.
  3. Finally, in enters the Goroutine launched in prefix method, which doesn't return anything but executes the handler after prefixing the incoming string with more data.

Now we have a full pipeline and a dispatcher of pipelines. The dispatcher will launch as many instances of the pipelines as we want to route the incoming requests to any available worker.

If none of the workers takes the request within 5 seconds, the request is lost.

Let's use this library in a small app.

An app using the workers pool

We will launch three workers of our defined pipeline. We use the NewDispatcher function to create the dispatcher and the channel that will receive all requests. This channel has a fixed buffer, which will be able to store up to 100 incoming messages before blocking:

   // workers_pipeline.go 
    func main() { 
        bufferSize := 100 
        var dispatcher Dispatcher = NewDispatcher(bufferSize) 

Then, we will launch the workers by calling the LaunchWorker method in the Dispatcher interface three times with an already filled WorkerLauncher type:

    workers := 3 
    for i := 0; i < workers; i++ { 
        var w WorkerLauncher = &PreffixSuffixWorker{ 
            prefixS: fmt.Sprintf("WorkerID: %d -> ", i), 
            suffixS: " World", 
            id:i, 
        } 
        dispatcher.LaunchWorker(w) 
    } 

Each WorkerLauncher type is an instance of PreffixSuffixWorker. The prefix will be a small text showing the worker ID and the suffix text world.

At this point, we have three workers with three Goroutines, each running concurrently and waiting for messages to arrive:

    requests := 10 
 
    var wg sync.WaitGroup 
    wg.Add(requests) 

We will make 10 requests. We also need a WaitGroup to properly synchronize the app so that it doesn't exit too early. You can find yourself using WaitGroups quite a lot when dealing with concurrent applications. For 10 requests, we'll need to wait for 10 calls to the Done() method, so we call the Add() method with a delta of 10. It's called delta because you can also pass a -5 later to leave it in five requests. In some situations, it can be useful:

    for i := 0; i < requests; i++ { 
        req := NewStringRequest("(Msg_id: %d) -> Hello", i, &wg) 
        dispatcher.MakeRequest(req) 
    } 
 
    dispatcher.Stop() 
 
    wg.Wait() 
}

To make requests, we will iterate a for loop. First, we create a Request using the function NewStringRequest that we wrote at the beginning of the Implementation section. In this value, the Data field will be the text we'll pass down the pipeline, and it will be the text that is "in the middle" of the appending and suffixing operation. In this case, we will send the message number and the word hello.

Once we have a request, we call the MakeRequest method with it. After all requests have been done, we stop the dispatcher that, as explained previously, will provoke a chain reaction that will stop all Goroutines in the pipeline.

Finally, we wait for the group so that all calls to the Done() method are received, which signals that all operations have been finished. It's time to try it out:

    go run *
    WorkerID: 1 -> (MSG_ID: 0) -> HELLO World
    WorkerID: 0 -> (MSG_ID: 3) -> HELLO World
    WorkerID: 0 -> (MSG_ID: 4) -> HELLO World
    WorkerID: 0 -> (MSG_ID: 5) -> HELLO World
    WorkerID: 2 -> (MSG_ID: 2) -> HELLO World
    WorkerID: 1 -> (MSG_ID: 1) -> HELLO World
    WorkerID: 0 -> (MSG_ID: 6) -> HELLO World
    WorkerID: 2 -> (MSG_ID: 9) -> HELLO World
    WorkerID: 0 -> (MSG_ID: 7) -> HELLO World
    WorkerID: 0 -> (MSG_ID: 8) -> HELLO World

Let's analyze the first message:

  1. This would be zero, so the message sent is (Msg_id: 0) -> Hello.
  2. Then, the text is uppercased, so now we have (MSG_ID: 0) -> HELLO.
  3. After uppercasing an append operation with the text world (note the space at the beginning of the text) is done. This will give us the text (MSG_ID: 0) -> HELLO World.
  4. Finally, the text WorkerID: 1 (in this case, the first worker took the task, but it could be any of them) is appended to the text from step 3 to give us the full returned message, WorkerID: 1 -> (MSG_ID: 0) -> HELLO World.

No tests?

Concurrent applications are difficult to test, especially if you are doing networking operations. It can be difficult, and code can change a lot just to test it. In any case, it is not justifiable to not perform tests. In this case, it is not especially difficult to test our small app. Create a test and copy/paste the contents of the main function there:

//workers_pipeline.go file 
package main 
 
import "testing" 
 
func Test_Dispatcher(t *testing.T){ 
    //pasted code from main function 
    bufferSize := 100
   
    var dispatcher Dispatcher = NewDispatcher(bufferSize)
   
    workers := 3
   
    for i := 0; i < workers; i++ 
    {
        
        var w WorkerLauncher = &PreffixSuffixWorker{
               
            prefixS: fmt.Sprintf("WorkerID: %d -> ", i), 
suffixS: " World", 
id: i,
}
        
        dispatcher.LaunchWorker(w)
   
    }
   
    //Simulate Requests
   
    requests := 10
   
    var wg 
    sync.WaitGroup
   
    wg.Add(requests) 
} 

Now we have to rewrite our handler to test that the returned contents are the ones we are expecting. Go to the for loop to modify the function that we are passing as a handler on each Request:

for i := 0; i < requests; i++ { 
    req := Request{ 
        Data: fmt.Sprintf("(Msg_id: %d) -> Hello", i), 
        handler: func(i interface{}) 
        { 
            s, ok := i.(string) 
            defer wg.Done() 
            if !ok 
            {
                    
                t.Fail()
                
            }
                
            ok, err := regexp.Match(
`WorkerID: d* -> (MSG_ID: d*) -> [A-Z]*sWorld`,
 []byte(s))

        
            if !ok || err != nil {
                    
                t.Fail()
                
            } 
        }, 
    } 
    dispatcher.MakeRequest(req) 
} 

We are going to use regular expressions to test the business. If you are not familiar with regular expressions, they are a quite powerful feature that help you to match content within a string. If you remember in our exercises when we were using the strings package. Contains is the function to find a text inside a string. We can also do it with regular expressions.

The problem is that regular expressions are quite expensive and consume a lot of resources.

We are using the Match function of the regexp package to provide a template to match. Our template is WorkerID: d* -> (MSG_ID: d) -> [A-Z]*sWorld (without quotes). Specifically, it describes the following:

  • A string that has the content WorkerID: d* -> (MSG_ID: d*", here "d* indicates any digit written zero or more times, so it will match WorkerID: 10 -> (MSG_ID: 1" and "WorkerID: 1 -> (MSG_ID: 10.
  • ") -> [A-Z]*sWorld" (parentheses must be escaped using backslashes). "*" means any uppercase character written zero or more times, so "s" is a white space and it must finish with the text World, so ) -> HELLO World" will match, but ) -> Hello World" won't, because "Hello must be all uppercase.

Running this test gives us the following output:

go test -v .
=== RUN   Test_Dispatcher
--- PASS: Test_Dispatcher (0.00s)
PASS
ok

Not bad, but we aren't testing that code is being executed concurrently, so this is more a business test than a unit test. Concurrency testing would force us to write the code in a completely different manner to check that it is creating the proper amount of Goroutines and the pipeline is following the expected workflow. This is not bad, but it's quite complex, and outside of the context of this book.

Wrapping up the Worker pool

With the workers pool, we have our first complex concurrent application that can be used in real-world production systems. It also has room to improve, but it is a very good design pattern to build concurrent bounded apps.

It is key that we always have the number of Goroutines that are being launched under control. While it's easy to launch thousands to achieve more parallelism in an app, we must be very careful that they don't have code that can hang them in an infinite loop, too.

With the workers pool, we can now fragment a simple operation in many parallel tasks. Think about it; this could achieve the same result with one simple call to fmt.Printf, but we have done a pipeline with it; then, we launched few instances of this pipeline and finally, distributed the workload between all those pipes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.195.236