Multiplexing channels

Multiplexing describes the methodology where we use a single resource to act upon multiple signals or actions. This method is used extensively in telecommunications and computer networks. We might find ourselves in a situation where we have multiple types of tasks that we want to execute. However, they can only be executed in mutual exclusion, or they need to work on a shared resource. For this, we make use of a pattern in Go known as channels multiplexing. Before we dive into how to actually multiplex channels, let's try to implement it on our own.

Imagine that we have a set of channels and we want to act on them as soon as data is sent over a channel. Here's a naïve approach on how we want to do this:

// naiveMultiplexing.go 
package main 
 
import "fmt" 
 
func main() { 
    channels := [5](chan int){ 
        make(chan int), 
        make(chan int), 
        make(chan int), 
        make(chan int), 
        make(chan int), 
    } 
 
    go func() { 
        // Starting to wait on channels 
        for _, chX := range channels { 
            fmt.Println("Receiving from", <- chX) 
        } 
    }() 
 
    for i := 1; i < 6; i++ { 
        fmt.Println("Sending on channel:", i) 
        channels[i] <- 1 
    } 
} 

The output of the preceding program is as follows:

Sending on channel: 1
fatal error: all goroutines are asleep - deadlock!
   
goroutine 1 [chan send]:
main.main()
      /home/entux/Documents/Code/GO-WORKSPACE/src/distributed-go/ch3/naiveSwitch.go:23 +0x2b1
    
goroutine 5 [chan receive]:
main.main.func1(0xc4200160c0, 0xc420016120, 0xc420016180, 0xc4200161e0, 0xc420016240)
      GO-WORKSPACE/src/distributed-go/ch3/naiveSwitch.go:17 +0xba
created by main.main
      GO-WORKSPACE/src/distributed-go/ch3/naiveSwitch.go:19 +0x18b
  

In the loop within the goroutine, the first channel is never waited upon and this causes the deadlock in the goroutine. Multiplexing helps us wait upon multiple channels without blocking on any of the channels while acting on a message once it is available on a channel.

The following are some important points to remember when multiplexing on channels:

  • Syntax:
      select { 
      case <- ch1: 
        // Statements to execute if ch1 receives a message 
      case val := <- ch2: 
        // Save message received from ch2 into a variable and
execute statements for ch2
}
  • It is possible that, by the time select is executed, more than one case is ready with a message. In this case, select will not execute all of the cases, but will pick one at random, execute it, and then exit the select statement.
  • However, the preceding point might be limited if we want to react on messages being sent to all channels in select cases. Then we can put the select statement inside a for loop and it will ensure that all messages will be handled.
  • Even though the for loop will handle messages sent on all channels, the loop will still be blocked until a message is available on it. There might be scenarios where we do not wish to block the loop iteration and instead do some "default" action. This can be achieved using default case in select statement.
  • Updated syntax based on the preceding two points is:
      for { 
        select { 
            case <- ch1: 
            // Statements to execute if ch1 receives a message 
            case val := <- ch2: 
            // Save message received from ch2 into a variable and
execute statements for ch2 default: // Statements to execute if none of the channels has yet
received a message. } }
  • In the case of buffered channels, the order in which the messages are received is not guaranteed.

The following is the correct way to multiplex on all the required channels without being blocked on any and continuing to work on all the messages being sent:

// multiplexing.go 
 
package main 
 
import ( 
    "fmt" 
) 
 
func main() { 
    ch1 := make(chan int) 
    ch2 := make(chan string) 
    ch3 := make(chan int, 3) 
    done := make(chan bool) 
    completed := make(chan bool) 
 
    ch3 <- 1 
    ch3 <- 2 
    ch3 <- 3 
    go func() { 
        for { 
 
            select { 
                case <-ch1: 
                      fmt.Println("Received data from ch1") 
                case val := <-ch2: 
                      fmt.Println(val) 
                case c := <-ch3: 
                      fmt.Println(c) 
                case <-done: 
                      fmt.Println("exiting...") 
                      completed <- true 
                      return 
            } 
        } 
    }() 
 
    ch1 <- 100 
    ch2 <- "ch2 msg" 
    // Uncomment us to avoid leaking the 'select' goroutine! 
    //close(done) 
    //<-completed 
} 

The following is the output of the preceding program:

1
Received data from ch1
2
3

Unfortunately, there is one flaw with the program: it leaks the goroutine handling, select. This is also pointed out in the comment near the end of the main function. This generally happens when we have a goroutine that is running but we cannot directly reach it. Even if a goroutine's reference is not stored, the GC will not garbage collect it. Thus, we need a mechanism to stop and return from such goroutines. In general, this can be achieved by creating a channel specifically for returning from the goroutine.

In the preceding code, we send the signal via the done channel. The following would be the output if we uncomment the lines and then run the program:

1
2
3
Received data from ch1
ch2 msg
exiting...
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.214.155