Fan-out

The implementation of multiplexing is pretty straightforward. The same channel needs to be passed to different stages so that each one will be reading from it.

Each goroutine is competing for resources during the runtime schedule, so if we want to reserve more of them, we can use more than one goroutine for a certain stage of the pipeline, or for a certain operation in our application.

We can create a small application that counts the occurrence of words which appear in a piece of text using such an approach. Let's create an initial producer stage that reads from a writer and returns a slice of words for that line:

func SourceLineWords(ctx context.Context, r io.ReadCloser) <-chan []string {
ch := make(chan []string)
go func() {
defer func() { r.Close(); close(ch) }()
b := bytes.Buffer{}
s := bufio.NewScanner(r)
for s.Scan() {
b.Reset()
b.Write(s.Bytes())
words := []string{}
w := bufio.NewScanner(&b)
w.Split(bufio.ScanWords)
for w.Scan() {
words = append(words, w.Text())
}
select {
case <-ctx.Done():
return
case ch <- words:
}
}
}()
return ch
}

We can now define another stage that will count the occurrence of these words. We will use this stage for the fan-out:

func WordOccurrence(ctx context.Context, src <-chan []string) <-chan map[string]int {
ch := make(chan map[string]int)
go func() {
defer close(ch)
for v := range src {
count := make(map[string]int)
for _, s := range v {
count[s]++
}
select {
case <-ctx.Done():
return
case ch <- count:
}
}
}()
return ch
}

In order to use the first stage as a source for more than one instance of the second stage, we just need to create more than one counting stage with the same input channel:

ctx, canc := context.WithCancel(context.Background())
defer canc()
src := SourceLineWords(ctx,
ioutil.NopCloser(strings.NewReader(cantoUno)))
count1, count2 := WordOccurrence(ctx, src), WordOccurrence(ctx, src)

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.148.210