A sample Kafka producer

In this section, we will learn how to write a producer that will publish events into the Kafka messaging queue. In the next section, we will process the events published in this section with a Storm topology that reads data from Kafka using KafkaSpout. Perform the following steps to create the producer:

  1. Create a new Maven project with the com.learningstorm group ID and the kafka-producer artifact ID.
  2. Add the following dependencies for Kafka in the pom.xml file:
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.8.0</artifactId>
      <version>0.8.1.1</version>
      <exclusions>
        <exclusion>
          <groupId>javax.jms</groupId>
          <artifactId>jms</artifactId>
        </exclusion>
        <exclusion>
          <groupId>com.sun.jdmk</groupId>
          <artifactId>jmxtools</artifactId>
        </exclusion>
        <exclusion>
          <groupId>com.sun.jmx</groupId>
          <artifactId>jmxri</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
  3. Add the following build plugins to the pom.xml file; it will execute the producer using Maven:
    <build>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>1.2.1</version>
          <executions>
            <execution>
              <goals>
                <goal>exec</goal>
              </goals>
            </execution>
          </executions>
          <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>com.learningstorm.kafka.WordsProducer</mainClass>
          </configuration>
        </plugin>
      </plugins>
    </build>
  4. Now, we will create the WordsProducer class in the com.learningstorm.kafka package. This class will produce each word from the first paragraph of Franz Kafka's Metamorphosis into the words_topic topic in Kafka as a single message. The following is the code of the WordsProducer class with explanation:
    public class WordsProducer {
      public static void main(String[] args) {
        // Build the configuration required for connecting to Kafka
        Properties props = new Properties();
    
        //List of Kafka brokers. Complete list of brokers is not
        //required as the producer will auto discover the rest of
        //the brokers. Change this to suit your deployment.
        props.put("metadata.broker.list", "localhost:9092");
    
        // Serializer used for sending data to kafka. Since we are sending string,
        // we are using StringEncoder.
        props.put("serializer.class", "kafka.serializer.StringEncoder");
    
        // We want acks from Kafka that messages are properly received.
        props.put("request.required.acks", "1");
    
        // Create the producer instance
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
    
        // Now we break each word from the paragraph
        for (String word : METAMORPHOSIS_OPENING_PARA.split("\s")) {
          // Create message to be sent to "words_topic" topic with the word
          KeyedMessage<String, String> data =new KeyedMessage<String, String>("words_topic", word);
    
          // Send the message
          producer.send(data);
        }
    
        System.out.println("Produced data");
    
        // close the producer
        producer.close();
      }
    
      // First paragraph from Franz Kafka's Metamorphosis
      private static String METAMORPHOSIS_OPENING_PARA = "One morning, when Gregor Samsa woke from troubled dreams, " + "he found himself transformed in his bed into a horrible " + "vermin. He lay on his armour-like back, and if he lifted " + "his head a little he could see his brown belly, slightly " + "domed and divided by arches into stiff sections.";
    }
  5. Now, we can run the producer by executing the following command:
    mvn compile exec:java
    

    The following output is displayed:

    Produced data
    
  6. Now, let's verify that the message has been produced using Kafka's console consumer by executing the following command:
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic words_topic --from-beginning
    

    The following output is displayed:

    One
    morning,
    when
    Gregor
    Samsa
    woke
    from
    troubled
    dreams,
    he
    found
    himself
    transformed
    in
    his
    bed
    into
    a
    horrible
    vermin.
    

So, we are able to produce messages into Kafka. In the next section, we will see how we can use KafkaSpout to read messages from Kafka and process them inside a Storm topology.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.105.255