These steps cover writing and running your application:
- From your Terminal or console application, create a new directory called ~/projects/go-programming-cookbook/chapter12/asynckafka and navigate to this directory.
- Run this command:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/asynckafka
You should see a file called go.mod that contains the following:
module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/asynckafka
- Copy the tests from ~/projects/go-programming-cookbook-original/chapter12/asynckafka, or use this as an exercise to write some of your own code!
- Ensure that Kafka is up and running on localhost:9092.
- Copy the consumer directory from the previous recipe.
- Create a directory named producer and navigate to it.
- Create a file called producer.go with the following content:
package main
import (
"log"
sarama "github.com/Shopify/sarama"
)
// Process response grabs results and errors from a producer
// asynchronously
func ProcessResponse(producer sarama.AsyncProducer) {
for {
select {
case result := <-producer.Successes():
log.Printf("> message: "%s" sent to partition
%d at offset %d ", result.Value,
result.Partition, result.Offset)
case err := <-producer.Errors():
log.Println("Failed to produce message", err)
}
}
}
- Create a file called handler.go with the following content:
package main
import (
"net/http"
sarama "github.com/Shopify/sarama"
)
// KafkaController allows us to attach a producer
// to our handlers
type KafkaController struct {
producer sarama.AsyncProducer
}
// Handler grabs a message from a GET parama and
// send it to the kafka queue asynchronously
func (c *KafkaController) Handler(w http.ResponseWriter, r
*http.Request) {
if err := r.ParseForm(); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
msg := r.FormValue("msg")
if msg == "" {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("msg must be set"))
return
}
c.producer.Input() <- &sarama.ProducerMessage{Topic:
"example", Key: nil, Value:
sarama.StringEncoder(msg)}
w.WriteHeader(http.StatusOK)
}
- Create a file called main.go with the following content:
package main
import (
"fmt"
"net/http"
sarama "github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
producer, err :=
sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.AsyncClose()
go ProcessResponse(producer)
c := KafkaController{producer}
http.HandleFunc("/", c.Handler)
fmt.Println("Listening on port :3333")
panic(http.ListenAndServe(":3333", nil))
}
- Navigate up a directory.
- Run go run ./consumer.
- In a separate Terminal from the same directory, run go run ./producer.
- In a third Terminal, run the following commands:
$ curl "http://localhost:3333/?msg=this"
$ curl "http://localhost:3333/?msg=is"
$ curl "http://localhost:3333/?msg=an"
$ curl "http://localhost:3333/?msg=example"
In the producer Terminal, you should see the following:
$ go run ./producer
Listening on port :3333
2017/05/07 13:52:54 > message: "this" sent to partition 0 at offset 0
2017/05/07 13:53:25 > message: "is" sent to partition 0 at offset 1
2017/05/07 13:53:27 > message: "an" sent to partition 0 at offset 2
2017/05/07 13:53:29 > message: "example" sent to partition 0 at offset 3
- In the consumer Terminal, you should see this:
$ go run ./consumer
2017/05/07 13:52:54 Consumed message: "this" at offset: 0
2017/05/07 13:53:25 Consumed message: "is" at offset: 1
2017/05/07 13:53:27 Consumed message: "an" at offset: 2
2017/05/07 13:53:29 Consumed message: "example" at offset: 3
- The go.mod file may be updated, and the go.sum file should now be present in the top-level recipe directory.
- If you have copied or written your own tests, go up one directory and run go test. Ensure that all tests pass.