Concurrent Publish/Subscriber design pattern

In this section, we will implement the Observer design pattern that we showed previously on Behavioral patterns, but with a concurrent structure and thread safety.

Description

If you remember from the previous explanation, the Observer pattern maintains a list of observers or subscribers that want to be notified of a particular event. In this case, each subscriber is going to run in a different Goroutine as well as the publisher. We will have new problems with building this structure:

  • Now, the access to the list of subscribers must be serialized. If we are reading the list with one Goroutine, we cannot be removing a subscriber from it or we will have a race.
  • When a subscriber is removed, the subscriber's Goroutine must be closed too, or it will keep iterating forever and we will run into Goroutine leaks.
  • When stopping the publisher, all subscribers must stop their Goroutines, too.

Objectives

The objectives of this publish/subscriber are the same as the ones we wrote on the Observer pattern. The difference here is the way we will develop it. The idea is to make a concurrent structure to achieve the same functionality, which is as follows:

  • Providing an event-driven architecture where one event can trigger one or more actions
  • Uncoupling the actions that are performed from the event that triggers them
  • Providing more than one source event that triggers the same action

The idea is to uncouple senders from receivers, hiding from the sender the identity of the receivers that will process its event, and hiding the receivers from the number of senders that can communicate with them.

In particular, if I develop a click in a button in some application, it could do something (such as log us in somewhere). Weeks later, we might decide to make it show a popup, too. If, every time we want to add some functionality to this button, we have to change the code where it handles the click action, that function will become huge and not very portable to other projects. If we use a publisher and one observer for every action, the click function only needs to publish one single event using a publisher, and we will just write subscribers to this event every time we want to improve the functionality. This is especially important in applications with user interfaces where many things to do in a single UI action can slow the responsiveness of an interface, completely destroying the user experience.

By using a concurrent structure to develop the Observer pattern, a UI cannot feel all the tasks that are being executed in the background if a concurrent structure is defined and the device allows us to execute parallel tasks.

Example - a concurrent notifier

We will develop a notifier similar to the one we developed in  Chapter 7, Behavioral Patterns - Visitor, State, Mediator, and Observer Design Patterns. This is to focus on the concurrent nature of the structure instead of detailing too many things that have already been explained. We have developed an observer already, so we are familiar with the concept.

This particular notifier will work by passing around interface{} values, like in the workers pool example. This way, we can use it for more than a single type by introducing some overhead when casting on the receiver.

We will work with two interfaces now. First, a Subscriber interface:

    type Subscriber interface { 
        Notify(interface{}) error 
        Close() 
    } 

Like in the previous example, it must have a Notify method in the Subscriber interface of new events. This is the Notify method that accepts an interface{} value and returns an error. The Close() method, however, is new, and it must trigger whatever actions are needed to stop the Goroutine where the subscriber is listening for new events.

The second and final interface is the Publisher interface:

    type Publisher interface { 
        start() 
        AddSubscriberCh() chan<- Subscriber 
        RemoveSubscriberCh() chan<- Subscriber 
        PublishingCh() chan<- interface{} 
        Stop() 
    } 

The Publisher interface has the same actions we already know for a publisher but to work with channels. The AddSubscriberCh and RemoveSubscriberCh methods accepts a Subscriber interface (any type that satisfies the Subscriber interface). It must have a method to publish messages and a Stop method to stop them all (publisher and subscriber Goroutines)

Acceptance criteria

Requirements between this example and the one in the Chapter 7 , Behavioral patterns - Visitor, State, Mediator, and Observer Design Patterns must not change. The objective in both examples is the same so the requirements must also be the same. In this case, our requirements are technical, so we actually need to add some more acceptance criteria:

  1. We must have a publisher with a PublishingCh method that returns a channel to send messages through and triggers a Notify method on every observer subscribed.
  2. We must have a method to add new subscribers to the publisher.
  3. We must have a method to remove new subscribers from the publisher.
  4. We must have a method to stop a subscriber.
  5. We must have a method to stop a Publisher interface that will also stop all subscribers.
  6. All inter Goroutine communication must be synchronized so that no Goroutine is locked waiting for a response. In such cases, an error is returned after the specified timeout period has passed.

Well, these criteria seem quite daunting. We have left out some requirements that would add even more complexity, such as removing non-responding subscribers or checks to monitor that the publisher Goroutine is always on.

Unit test

We have mentioned previously that testing concurrent applications can be difficult. With the correct mechanism, it still can be done, so let's see how much we can test without big headaches.

Testing subscriber

Starting with subscribers, which seem to have a more encapsulated functionality, the first subscriber must print incoming messages from the publisher to an io.Writer interface. We have mentioned that the subscriber has an interface with two methods, Notify(interface{}) error and the Close() method:

    // writer_sub.go file 
    package main 
 
    import "errors" 
 
    type writerSubscriber struct { 
        id int 
        Writer io.Writer 
    } 
 
    func (s *writerSubscriber) Notify(msg interface{}) error { 
        return erorrs.NeW("Not implemented yet") 
    } 
    func (s *writerSubscriber) Close() {} 

OK. This is going to be our writer_sub.go file. Create the corresponding test file, called the writer_sub_test.go file:

    package main 
    func TestStdoutPrinter(t *testing.T) { 

Now, the first problem we have is that the functionality prints to the stdout, so there's no return value to check. We can solve it in three ways:

  • Capturing the stdout method.
  • Injecting an io.Writer interface to print to it. This is the preferred solution, as it makes the code more manageable.
  • Redirecting the stdout method to a different file.

We'll take the second approach. Redirection is also a possibility. The os.Stdout is a pointer to an os.File type, so it involves replacing this file with one we control, and reading from it:

    func TestWriter(t *testing.T) { 
        sub := NewWriterSubscriber(0, nil) 

The NewWriterSubscriber subscriber isn't defined yet. It must help in the creation of this particular subscriber, returning a type that satisfies the Subscriber interface, so let's quickly declare it on the writer_sub.go file:

    func NewWriterSubscriber(id int, out io.Writer) Subscriber { 
        return &writerSubscriber{} 
    } 

Ideally, it must accept an ID and an io.Writer interface as the destination for its writes. In this case, we need a custom io.Writer interface for our test, so we'll create a mockWriter on the writer_sub_test.go file for it:

    type mockWriter struct { 
        testingFunc func(string) 
    } 
 
    func (m *mockWriter) Write(p []byte) (n int, err error) { 
        m.testingFunc(string(p)) 
        return len(p), nil 
    } 

The mockWriter structure will accept a testingFunc as one of its fields. This testingFunc field accepts a string that represents the bytes written to the mockWriter structure. To implement an io.Writer interface, we need to define a Write([]byte) (int, error) method. In our definition, we pass the contents of p as a string (remember that we always need to return the bytes read and an error, or not, on every Write method). This approach delegates the definition of testingFunc to the scope of the test.

We are going to call the Notify method on the Subcriber interface, which must write on the io.Writer interface like the mockWriter structure. So, we'll define the testingFunc of a mockWriter structure before calling the Notify method:

    // writer_sub_test.go file 
    func TestPublisher(t *testing.T) { 
        msg := "Hello" 
 
        var wg sync.WaitGroup 
        wg.Add(1) 
 
        stdoutPrinter := sub.(*writerSubscriber) 
        stdoutPrinter.Writer = &mockWriter{ 
            testingFunc: func(res string) { 
                if !strings.Contains(res, msg) { 
                    t.Fatal(fmt.Errorf("Incorrect string: %s", res)) 
                } 
                wg.Done() 
            }, 
        } 

We will send the Hello message. This also means that whatever the Subscriber interface does, it must eventually print the Hello message on the provided io.Writer interface.

So if, eventually, we receive a string on the testing function, we'll need to synchronize with the Subscriber interface to avoid race conditions on tests. That's why we use so much WaitGroup. It's a very handy and easy-to-use type to handle this scenario. One Notify method call will need to wait for one call to the Done() method, so we call the Add(1) method (with one unit).

Ideally, the NewWriterSubscriber function must return an interface, so we need to type assert it to the type we are working with during the test, in this case, the stdoutPrinter method. I have omitted error checking when doing the casting on purpose, just to make things easier. Once we have a writerSubscriber type, we can access its Write field to replace it with the mockWriter structure. We could have directly passed an io.Writer interface on the NewWriterSubscriber function, but we wouldn't cover the scenario where a nil object is passed and it sets the os.Stdout instance to a default value.

So, the testing function will eventually receive a string containing what was written by the subscriber. We just need to check if the received string, the one that the Subscriber interface will receive, prints the word Hello at some point and nothing better that strings.Contains function for it. Everything is defined under the scope of the testing function, so we can use the value of the t object to also signal that the test has failed.

Once we have done the checking, we must call to the Done() method to signal that we have already tested the expected result:

err := sub.Notify(msg) 
if err != nil { 
    t.Fatal(err) 
    } 
 
    wg.Wait() 
    sub.Close() 
} 

We must actually call the Notify and Wait methods for the call to the Done method to check that everything was correct.

Note

Did you realize that we have defined the behavior on tests more or less in reverse? This is very common in concurrent apps. It can be confusing sometimes, as it becomes difficult to know what a function could be doing if we can't follow calls linearly, but you get used to it quite quickly. Instead of thinking "it does this, then this, then that," it's more like "this will be called when executing that." This is also because the order of execution in a concurrent application is unknown until some point, unless we use synchronization primitives (such as WaitGroups and channels) to pause execution at certain moments.

Let's execute the test for this type now:

go test -cover -v -run=TestWriter .
=== RUN   TestWriter
--- FAIL: TestWriter (0.00s)
        writer_sub_test.go:40: Not implemented yet
FAIL
coverage: 6.7% of statements
exit status 1
FAIL

It has exited fast but it has failed. Actually, the call to the Done() method has not been executed, so it would be nice to change the last part of our test to this instead:

err := sub.Notify(msg)
if err != nil {
    wg.Done()
t.Error(err)
        }
        wg.Wait()
sub.Close()
    }  

Now, it doesn't stop execution because we are calling the Error function instead of the Fatal function, but we call the Done() method and the test ends where we prefer it to end, after the Wait() method is called. You can try to run the tests again, but the output will be the same.

Testing publisher

We have already seen a Publisher interface and the type that will satisfy which was the publisher type. The only thing we know for sure is that it will need some way to store subscribers, so it will at least have a Subscribers slice:

    // publisher.go type 
    type publisher struct { 
        subscribers []Subscriber 
    } 

To test the publisher type, we will also need a mock for the Subscriber interface:

    // publisher_test.go 
    type mockSubscriber struct { 
        notifyTestingFunc func(msg interface{}) 
        closeTestingFunc func() 
    } 
 
    func (m *mockSubscriber) Close() { 
        m.closeTestingFunc() 
    } 
 
    func (m *mockSubscriber) Notify(msg interface{}) error { 
        m.notifyTestingFunc(msg) 
        return nil 
    } 

The mockSubscriber type must implement the Subscriber interface, so it must have a Close() and a Notify(interface{}) error method. We can embed an existing type that implements it, such as,  the writerSubscriber, and override just the method that is interesting for us, but we will need to define both, so we won't embed anything.

So, we need to override the Notify and Close methods in this case to call the testing functions stored on the fields of the mockSubscriber type:

    func TestPublisher(t *testing.T) { 
        msg := "Hello" 
 
        p := NewPublisher() 

First of all, we will be sending messages through channels directly, this could lead to potential unwanted deadlocks so the first thing to define is a panic handler for cases such as, sending to close channels or no Goroutines listening on a channel. The message we will send to subscribers is Hello. So, each subscriber that has been received using the channel returned by the AddSubscriberCh method must receive this message. We will also use a New function to create Publishers, called NewPublisher. Change the publisher.go file now to write it:

   // publisher.go file 
    func NewPublisher() Publisher { 
        return &publisher{} 
    } 

Now we'll define the mockSubscriber to add it to the publisher list of known subscribers. Back to the publisher_test.go file:

        var wg sync.WaitGroup 
 
        sub := &mockSubscriber{ 
            notifyTestingFunc: func(msg interface{}) { 
                defer wg.Done() 
 
                s, ok := msg.(string) 
                if !ok { 
                    t.Fatal(errors.New("Could not assert result")) 
                } 
 
                if s != msg { 
                    t.Fail() 
                } 
            }, 
            closeTestingFunc: func() { 
                wg.Done() 
            }, 
        } 

As usual, we start with a WaitGroup. First, testing the function in our subscriber defers a call to the Done() method at the end of its execution. Then it needs to type cast msg variable because it's coming as an interface. Remember that this way, we can use the Publisher interface with many types by introducing the overhead of the type assertion. This is done on line s, ok := msg.(string).

Once we have type cast msg to a string, s, we just need to check if the value received in the subscriber is the same as the value we sent, or fail the test if not:

        p.AddSubscriberCh() <- sub 
        wg.Add(1) 
 
        p.PublishingCh() <- msg 
        wg.Wait() 

We add the mockSubscriber type using the AddSubscriberCh method. We publish our message just after getting ready, by adding one to the WaitGroup, and just before setting the WaitGroup to wait so that the test doesn't continue until the mockSubscriber type calls the Done() method.

Also, we need to check if the number of the Subscriber interface has grown after calling the AddSubscriberCh method, so we'll need to get the concrete instance of publisher on the test:

        pubCon := p.(*publisher) 
        if len(pubCon.subscribers) != 1 { 
            t.Error("Unexpected number of subscribers") 
        } 

Type assertion is our friend today! Once we have the concrete type, we can access the underlying slice of subscribers for the Publisher interface. The number of subscribers must be 1 after calling the AddSubscriberCh method once, or the test will fail. The next step is to check just the opposite--when we remove a Subscriber interface, it must be taken from this list:

   wg.Add(1) 
   p.RemoveSubscriberCh() <- sub 
   wg.Wait() 
 
   //Number of subscribers is restored to zero 
   if len(pubCon.subscribers) != 0 { 
         t.Error("Expected no subscribers") 
   } 
 
   p.Stop() 
}  

The final step in our test is to stop the publisher so no more messages can be sent and all the Goroutines are stopped.

The test is finished, but we can't run tests until the publisher type has all the methods implemented; this must be the final result:

    type publisher struct { 
        subscribers []Subscriber 
        addSubCh    chan Subscriber 
        removeSubCh chan Subscriber 
        in          chan interface{} 
        stop        chan struct{} 
    } 
 
    func (p *publisher) AddSubscriberCh() chan<- Subscriber { 
        return nil 
    } 
 
    func (p *publisher) RemoveSubscriberCh() chan<- Subscriber { 
        return nil 
    } 
 
    func (p *publisher) PublishingCh() chan<- interface{} { 
        return nil 
    } 
 
    func (p *publisher) Stop(){} 

With this empty implementation, nothing good can happen when running the tests:

go test -cover -v -run=TestPublisher .
atal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
testing.(*T).Run(0xc0420780c0, 0x5244c6, 0xd, 0x5335a0, 0xc042037d20)
      /usr/local/go/src/testing/testing.go:647 +0x31d
testing.RunTests.func1(0xc0420780c0)
      /usr/local/go/src/testing/testing.go:793 +0x74
testing.tRunner(0xc0420780c0, 0xc042037e10)
      /usr/local/go/src/testing/testing.go:610 +0x88
testing.RunTests(0x5335b8, 0x5ada40, 0x2, 0x2, 0x40d7e9)
      /usr/local/go/src/testing/testing.go:799 +0x2fc
testing.(*M).Run(0xc042037ed8, 0xc04200a4f0)
      /usr/local/go/src/testing/testing.go:743 +0x8c
main.main()
      go-design-patterns/concurrency_3/pubsub/_test/_testmain.go:56 +0xcd
goroutine 5 [chan send (nil chan)]:
go-design-patterns/concurrency_3/pubsub.TestPublisher(0xc042078180)
      go-design-patterns/concurrency_3/pubsub/publisher_test.go:55 +0x372
testing.tRunner(0xc042078180, 0x5335a0)
      /usr/local/go/src/testing/testing.go:610 +0x88
created by testing.(*T).Run
      /usr/local/go/src/testing/testing.go:646 +0x2f3
exit status 2
FAIL  go-design-patterns/concurrency_3/pubsub   1.587s

Yes it has failed but, it's not a controlled fail at all. This was done on purpose to show a couple of things to be careful of in Go. First of all, the error produced in this test is a fatal error, which usually points to a bug in the code. This is important because while a panic error can be recovered, you cannot do the same with a fatal error.

In this case, the error is telling us the problem: goroutine 5 [chan send (nil chan)], a nil channel so it's actually a bug in our code. How can we solve this? Well, this is also interesting.

The fact that we have a nil channel is caused by the code we wrote to compile unit tests but this particular error won't be raised once the appropriate code is written (because we'll never return a nil channel in this case). We could return a channel that is never use we cause a fatal error with a deadlock, which wouldn't be any progress at all either.

An idiomatic way to solve it would be to return a channel and an error so that you can have an error package with a type implementing the Error interface that returns a specific error such as NoGoroutinesListening or ChannelNotCreated. We have already seen many of this implementations so we'll leave these as an exercise to the reader and we will move forward to maintain focus on the concurrent nature of the chapter.

Nothing surprising there, so we can move to the implementation phase.

Implementation

To recall, the writerSubscriber must receive messages that it will write on a type that satisfies the io.Writer interface.

So, where do we start? Well, each subscriber will run its own Goroutine, and we have seen that the best method to communicate with a Goroutine is a channel. So, we will need a field with a channel in the Subscriber type. We can use the same approach as in pipelines to end with the NewWriterSubscriber function and the writerSubscriber type:

    type writerSubscriber struct { 
        in     chan interface{} 
        id     int 
        Writer io.Writer 
    } 
 
    func NewWriterSubscriber(id int, out io.Writer) Subscriber { 
        if out == nil { 
            out = os.Stdout 
        } 
 
        s := &writerSubscriber{ 
            id:     id, 
            in:     make(chan interface{}), 
            Writer: out, 
        } 
 
        go func(){ 
            for msg := range s.in { 
                fmt.Fprintf(s.Writer, "(W%d): %v
", s.id, msg) 
            } 
        }() 
 
        return s 
    } 

In the first step, if no writer is specified (the out argument is nil), the default io.Writer interface is stdout. Then, we create a new pointer to the writerSubscriber type with the ID passed in the first argument, the value of out (os.Stdout, or whatever came in the argument if it wasn't nil), and a channel called in to maintain the same naming as in previous examples.

Then we launch a new Goroutine; this is the launching mechanism we mentioned. Like in the pipelines, the subscriber will iterate over the in channel every time a new message is received and it will format its contents to a string, which also contains the ID of the current subscriber.

As we learned previously, if the in channel is closed, the for range loop will stop and that particular Goroutine will finish, so the only thing we need to do in the Close method is to actually close the in channel:

    func (s *writerSubscriber) Close() { 
        close(s.in) 
    } 

OK, only the Notify method is left; the Notify method is a convenient method to manage a particular behavior when communicating, and we will use a pattern that is common in many calls:

    func (s *writerSubscriber) Notify(msg interface{}) (err error) { 
        defer func(){ 
            if rec := recover(); rec != nil { 
                err = fmt.Errorf("%#v", rec) 
            } 
        }() 
 
        select { 
        case s.in <- msg: 
        case <-time.After(time.Second): 
            err = fmt.Errorf("Timeout
") 
        } 
 
        return 
    } 

When communicating with a channel, there are two behavior that we must usually control: one is waiting time and the other is when the channel is closed. The deferred function actually works for any panicking error that can occur within the function. If the Goroutine panics, it will still execute the deferred function with the recover() method. The recover() method returns an interface of whatever the error was, so in our case, we set the returning variable error to the formatted value returned by recover (which is an interface). The "%#v" parameter gives us most of the information about any type when formatted to a string. The returned error will be ugly, but it will contain most of the information we can extract about the error. For a closed channel, for example, it will return "send on a closed channel". Well, this seems clear enough.

The second rule is about waiting time. When we send a value over a channel, we will be blocked until another Goroutine takes the value from it (it will happen the same with a filled buffered channel). We don't want to get blocked forever, so we set a timeout period of one second by using a select handler. In short, with select we are saying: either you take the value in less than 1 second or I will discard it and return an error.

We have the Close, Notify, and NewWriterSubscriber methods, so we can try our test again:

go test -run=TestWriter -v .
=== RUN   TestWriter
--- PASS: TestWriter (0.00s)
PASS
ok

Much better now. The Writer has taken the mock writer we wrote on the test and has written to it the value we pass to the Notify method. At the same time, close has probably closed the channel effectively, because the Notify method is returning an error after calling the Close method. One thing to mention is that we can't check if a channel is closed or not without interacting with it; that's why we had to defer the execution of a closure that will check the contents of the recover() function in the Notify method.

Implementing the publisher

OK, the publisher will need also a launching mechanism, but the main problems to deal with are race conditions accessing the subscriber list. We can solve this issue with a Mutex object from the sync package but we have already seen how to use this so we will use channels instead.

When using channels, we will need a channel for each action that can be considered dangerous--add a subscriber, remove a subscriber, retrieve the list of subscribers to Notify method them of a message, and a channel to stop all the subscribers. We also need a channel for incoming messages:

    type publisher struct { 
        subscribers []Subscriber 
        addSubCh    chan Subscriber 
        removeSubCh chan Subscriber 
        in          chan interface{} 
        stop        chan struct{} 
    } 

Names are self-descriptive but, in short, subscribers maintain the list of subscribers; this is the slice that needs multiplexed access. The addSubCh instance is the channel to communicate with when you want to add a new subscriber; that's why it's a channel of subscribers. The same explanation applies to the removeSubCh channel, but this channel is to remove the subscriber. The in channel will handle incoming messages that must be broadcast to all subscribers. Finally, the stop channel must be called when we want to kill all Goroutines.

OK, let's start with the AddSubscriberCh, RemoveSubscriber and PublishingCh methods, which must return the channel to add and remove subscribers and the channel to send messages to all of them:

    func (p *publisher) AddSubscriber() { 
        return p.addSubCh 
    } 
 
    func (p *publisher) RemoveSubscriberCh() { 
        return p.removeSubCh 
    } 

    func (p *publisher) PublishMessage(){ 
        return p.in 
    } 

The Stop() function the stop channel by closing it. This will effectively spread the signal to every listening Goroutine:

func (p *publisher) Stop(){ 
  close(p.stop) 
} 

The Stop method, the function to stop the publisher and the subscribers, also pushes to its respective channel, called stop.

You may be wondering why we don't simply leave the channels available so that users push directly to this channel instead of using the proxying function. Well, the idea is that the user that integrates the library in their app doesn't have to deal with the complexity of the concurrent structure associated with the library, so they can focus on their business while maximizing performance as much as possible.

Handling channels without race conditions

Until now, we have forwarded data to the channels on the publisher but we haven't actually handled any of that data. The launcher mechanism that is going to launch a different Goroutine will handle them all.

We will create a launch method that we will execute by using the go keyword instead of embedding the whole function inside the NewPublisher function:

func (p *publisher) start() { 
  for { 
    select { 
    case msg := <-p.in: 
      for _, ch := range p.subscribers { 
        sub.Notify(msg) 
      } 

Launch is a private method and we haven't tested it. Remember that private methods are usually called from public methods (the ones we have tested). Generally, if a private method is not called from a public method, it can't be called at all!

The first thing we notice with this method is that it is an infinite for loop that will repeat a select operation between many channels but only one of them can be executed each time. The first of these operations is the one that receives a new message to publish to subscribers. The case msg := <- p.in: code handles this incoming operation.

In this case, we are iterating over all subscribers and executing their Notify method. You may be wondering why we don't add the go keyword in front so that the Notify method is executed as a different Goroutine and therefore iterates much faster. Well, this because we aren't demultiplexing the actions of receiving a message and of closing the message. So, if we launch the subscriber in a new Goroutine and it is closed while the message is processed in the Notify method, we'll have a race condition where a message will try to be sent within the Notify method to a closed channel. In fact, we are considering this scenario when we develop the Notify method but, still, we won't control the number of Goroutines launched if we call the Notify method in a new Goroutine each time. For simplicity, we just call the Notify method, but it is a nice exercise to control the number of Goroutines waiting for a return in a Notify method execution. By buffering the in channel in each subscriber, we can also achieve a good solution:

    case sub := <-p.addSubCh: 
    p.subscribers = append(p.subscribers, sub) 

The next operation is what to do when a value arrives to the channel to add subscribers. In this case it's simple: we update it, appending the new value to it. While this case is executed, not other calls can be executed in this selection:

     case sub := <-p.removeSubCh: 
     for i, candidate := range p.subscribers { 
         if candidate == sub { 
             p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...) 
             candidate.Close() 
             break 
        } 
    } 

When a value arrives at the remove channel, the operation is a bit more complex because we have to search for the subscriber in the slice. We use a O(N) approach for it, iterating from the beginning until we find it, but the search algorithm could be greatly improved. Once we find the corresponding Subscriber interface, we remove it from the subscribers slice and stop it. One thing to mention is that on tests, we are accessing the length of the subscribers slice directly without demultiplexing the operation. This is clearly a race condition, but generally, it isn't reflected when running the race detector.

The solution will be to develop a method just to multiplex calls to get the length of the slice, but it won't belong to the public interface. Again, for simplicity, we'll leave it like this, or this example may become too complex to handle:

    case <-p.stop: 
    for _, sub := range p.subscribers { 
        sub.Close() 
            } 
 
        close(p.addSubCh) 
        close(p.in) 
        close(p.removeSubCh) 
 
        return 
        } 
    } 
} 

The last operation to demultiplex is the stop operation, which must stop all Goroutines in the publisher and subscribers. Then we have to iterate through every Subscriber stored in the subscribers field to execute their Close() method, so their Goroutines are closed, too. Finally, if we return this Goroutine, it will finish, too.

OK, time to execute all tests and see how is it going:

go test -race .
ok

Not so bad. All tests have passed successfully and we have our Observer pattern ready. While the example can still be improved, it is a great example of how we must handle an Observer pattern using channels in Go. As an exercise, we encourage you to try the same example using mutexes instead of channels to control access. It's a bit easier, and will also give you an insight of how to work with mutexes.

A few words on the concurrent Observer pattern

This example has demonstrated how to take advantage of multi-core CPUs to build a concurrent message publisher by implementing the Observer pattern. While the example was long, we have tried to show a common pattern when developing concurrent apps in Go.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.168.46