These steps cover writing and running your application:
- From your Terminal or console application, create a new directory called ~/projects/go-programming-cookbook/chapter12/kafkaflow and navigate to this directory.
- Run this command:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/kafkaflow
You should see a file called go.mod that contains the following:
module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/kafkaflow
- Copy the tests from ~/projects/go-programming-cookbook-original/chapter12/kafkaflow, or use this as an exercise to write some of your own code!
- Ensure that Kafka is up and running on localhost:9092.
- Create a file called components.go with the following content:
package kafkaflow
import (
"fmt"
"strings"
flow "github.com/trustmaster/goflow"
)
// Upper upper cases the incoming
// stream
type Upper struct {
Val <-chan string
Res chan<- string
}
// Process loops over the input values and writes the upper
// case string version of them to Res
func (e *Upper) Process() {
for val := range e.Val {
e.Res <- strings.ToUpper(val)
}
}
// Printer is a component for printing to stdout
type Printer struct {
flow.Component
Line <-chan string
}
// Process Prints the current line received
func (p *Printer) Process() {
for line := range p.Line {
fmt.Println(line)
}
}
- Create a file called network.go with the following content:
package kafkaflow
import "github.com/trustmaster/goflow"
// NewUpperApp wires together the components
func NewUpperApp() *goflow.Graph {
u := goflow.NewGraph()
u.Add("upper", new(Upper))
u.Add("printer", new(Printer))
u.Connect("upper", "Res", "printer", "Line")
u.MapInPort("In", "upper", "Val")
return u
}
- Create a file called main.go in a directory named consumer with the following content:
package main
import (
"github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/kafkaflow"
sarama "github.com/Shopify/sarama"
flow "github.com/trustmaster/goflow"
)
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition("example", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer partitionConsumer.Close()
net := kafkaflow.NewUpperApp()
in := make(chan string)
net.SetInPort("In", in)
wait := flow.Run(net)
defer func() {
close(in)
<-wait
}()
for {
msg := <-partitionConsumer.Messages()
in <- string(msg.Value)
}
}
- Copy the producer directory from the Using Kafka with Saram recipe.
- Run go run ./consumer.
- In a separate Terminal from the same directory, run go run ./producer.
- In the producer Terminal, you should now see the following:
$ go run ./producer
2017/05/07 18:24:12 > message "Message 0" sent to partition 0 at offset 0
2017/05/07 18:24:12 > message "Message 1" sent to partition 0 at offset 1
2017/05/07 18:24:12 > message "Message 2" sent to partition 0 at offset 2
2017/05/07 18:24:12 > message "Message 3" sent to partition 0 at offset 3
2017/05/07 18:24:12 > message "Message 4" sent to partition 0 at offset 4
2017/05/07 18:24:12 > message "Message 5" sent to partition 0 at offset 5
2017/05/07 18:24:12 > message "Message 6" sent to partition 0 at offset 6
2017/05/07 18:24:12 > message "Message 7" sent to partition 0 at offset 7
2017/05/07 18:24:12 > message "Message 8" sent to partition 0 at offset 8
2017/05/07 18:24:12 > message "Message 9" sent to partition 0 at offset 9
In the consumer Terminal, you should see the following:
$ go run ./consumer
MESSAGE 0
MESSAGE 1
MESSAGE 2
MESSAGE 3
MESSAGE 4
MESSAGE 5
MESSAGE 6
MESSAGE 7
MESSAGE 8
MESSAGE 9
- The go.mod file may be updated, and the go.sum file should now be present in the top-level recipe directory.
- If you have copied or written your own tests, go up one directory and run go test. Ensure that all the tests pass.