How to do it...

These steps cover writing and running your application:

  1. From your Terminal or console application, create a new directory called ~/projects/go-programming-cookbook/chapter12/asynckafka and navigate to this directory.
  2. Run this command:
$ go mod init github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/asynckafka

You should see a file called go.mod that contains the following:

module github.com/PacktPublishing/Go-Programming-Cookbook-Second-Edition/chapter12/asynckafka   
  1. Copy the tests from ~/projects/go-programming-cookbook-original/chapter12/asynckafka, or use this as an exercise to write some of your own code!
  2. Ensure that Kafka is up and running on localhost:9092.
  3. Copy the consumer directory from the previous recipe.
  4. Create a directory named producer and navigate to it.
  5. Create a file called producer.go with the following content:
        package main

import (
"log"

sarama "github.com/Shopify/sarama"
)

// Process response grabs results and errors from a producer
// asynchronously
func ProcessResponse(producer sarama.AsyncProducer) {
for {
select {
case result := <-producer.Successes():
log.Printf("> message: "%s" sent to partition
%d at offset %d ", result.Value,
result.Partition, result.Offset)
case err := <-producer.Errors():
log.Println("Failed to produce message", err)
}
}
}
  1. Create a file called handler.go with the following content:
        package main

import (
"net/http"

sarama "github.com/Shopify/sarama"
)

// KafkaController allows us to attach a producer
// to our handlers
type KafkaController struct {
producer sarama.AsyncProducer
}

// Handler grabs a message from a GET parama and
// send it to the kafka queue asynchronously
func (c *KafkaController) Handler(w http.ResponseWriter, r
*http.Request) {
if err := r.ParseForm(); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

msg := r.FormValue("msg")
if msg == "" {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("msg must be set"))
return
}
c.producer.Input() <- &sarama.ProducerMessage{Topic:
"example", Key: nil, Value:
sarama.StringEncoder(msg)}
w.WriteHeader(http.StatusOK)
}
  1. Create a file called main.go with the following content:
        package main

import (
"fmt"
"net/http"

sarama "github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
producer, err :=
sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.AsyncClose()

go ProcessResponse(producer)

c := KafkaController{producer}
http.HandleFunc("/", c.Handler)
fmt.Println("Listening on port :3333")
panic(http.ListenAndServe(":3333", nil))
}
  1. Navigate up a directory.
  2. Run go run ./consumer.
  3. In a separate Terminal from the same directory, run go run ./producer.
  4. In a third Terminal, run the following commands:
$ curl "http://localhost:3333/?msg=this"      
$ curl "http://localhost:3333/?msg=is"

$ curl "http://localhost:3333/?msg=an"
$ curl "http://localhost:3333/?msg=example"

In the producer Terminal, you should see the following:

$ go run ./producer
Listening on port :3333
2017/05/07 13:52:54 > message: "this" sent to partition 0 at offset 0
2017/05/07 13:53:25 > message: "is" sent to partition 0 at offset 1
2017/05/07 13:53:27 > message: "an" sent to partition 0 at offset 2
2017/05/07 13:53:29 > message: "example" sent to partition 0 at offset 3
  1. In the consumer Terminal, you should see this:
$ go run ./consumer
2017/05/07 13:52:54 Consumed message: "this" at offset: 0
2017/05/07 13:53:25 Consumed message: "is" at offset: 1
2017/05/07 13:53:27 Consumed message: "an" at offset: 2
2017/05/07 13:53:29 Consumed message: "example" at offset: 3
  1. The go.mod file may be updated, and the go.sum file should now be present in the top-level recipe directory.
  2. If you have copied or written your own tests, go up one directory and run go test. Ensure that all tests pass.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.86.134