How to do it...

These steps cover writing and running your application:

  1. From your Terminal or console application, create a new directory called ~/projects/go-programming-cookbook/chapter12/kafkaflow and navigate to this directory.
  2. Run this command:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/kafkaflow

You should see a file called go.mod that contains the following:

module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/kafkaflow   
  1. Copy the tests from ~/projects/go-programming-cookbook-original/chapter12/kafkaflow, or use this as an exercise to write some of your own code!
  2. Ensure that Kafka is up and running on localhost:9092.
  3. Create a file called components.go with the following content:
package kafkaflow

import (
"fmt"
"strings"

flow "github.com/trustmaster/goflow"
)

// Upper upper cases the incoming
// stream
type Upper struct {
Val <-chan string
Res chan<- string
}

// Process loops over the input values and writes the upper
// case string version of them to Res
func (e *Upper) Process() {
for val := range e.Val {
e.Res <- strings.ToUpper(val)
}
}

// Printer is a component for printing to stdout
type Printer struct {
flow.Component
Line <-chan string
}

// Process Prints the current line received
func (p *Printer) Process() {
for line := range p.Line {
fmt.Println(line)
}
}
  1. Create a file called network.go with the following content:
package kafkaflow

import "github.com/trustmaster/goflow"

// NewUpperApp wires together the components
func NewUpperApp() *goflow.Graph {
u := goflow.NewGraph()

u.Add("upper", new(Upper))
u.Add("printer", new(Printer))

u.Connect("upper", "Res", "printer", "Line")
u.MapInPort("In", "upper", "Val")

return u
}
  1. Create a file called main.go in a directory named consumer with the following content:
package main

import (
"github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/kafkaflow"
sarama "github.com/Shopify/sarama"
flow "github.com/trustmaster/goflow"
)

func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition("example", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer partitionConsumer.Close()

net := kafkaflow.NewUpperApp()

in := make(chan string)
net.SetInPort("In", in)

wait := flow.Run(net)
defer func() {
close(in)
<-wait
}()

for {
msg := <-partitionConsumer.Messages()
in <- string(msg.Value)
}

}
  1. Copy the producer directory from the Using Kafka with Saram recipe.
  2. Run go run ./consumer.
  3. In a separate Terminal from the same directory, run go run ./producer.
  4. In the producer Terminal, you should now see the following:
$ go run ./producer 
2017/05/07 18:24:12 > message "Message 0" sent to partition 0 at offset 0
2017/05/07 18:24:12 > message "Message 1" sent to partition 0 at offset 1
2017/05/07 18:24:12 > message "Message 2" sent to partition 0 at offset 2
2017/05/07 18:24:12 > message "Message 3" sent to partition 0 at offset 3
2017/05/07 18:24:12 > message "Message 4" sent to partition 0 at offset 4
2017/05/07 18:24:12 > message "Message 5" sent to partition 0 at offset 5
2017/05/07 18:24:12 > message "Message 6" sent to partition 0 at offset 6
2017/05/07 18:24:12 > message "Message 7" sent to partition 0 at offset 7
2017/05/07 18:24:12 > message "Message 8" sent to partition 0 at offset 8
2017/05/07 18:24:12 > message "Message 9" sent to partition 0 at offset 9

In the consumer Terminal, you should see the following:

$ go run ./consumer
MESSAGE 0
MESSAGE 1
MESSAGE 2
MESSAGE 3
MESSAGE 4
MESSAGE 5
MESSAGE 6
MESSAGE 7
MESSAGE 8
MESSAGE 9
  1. The go.mod file may be updated, and the go.sum file should now be present in the top-level recipe directory.
  2. If you have copied or written your own tests, go up one directory and run go test. Ensure that all the tests pass.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.139.50