In this section, we will learn how to write a producer that will publish events into the Kafka messaging queue. In the next section, we will process the events published in this section with a Storm topology that reads data from Kafka using KafkaSpout
. Perform the following steps to create the producer:
com.learningstorm
group ID and the kafka-producer
artifact ID.pom.xml
file:<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.0</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> </exclusions> </dependency>
pom.xml
file; it will execute the producer using Maven:<build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>com.learningstorm.kafka.WordsProducer</mainClass> </configuration> </plugin> </plugins> </build>
WordsProducer
class in the com.learningstorm.kafka
package. This class will produce each word from the first paragraph of Franz Kafka's Metamorphosis into the words_topic
topic in Kafka as a single message. The following is the code of the WordsProducer
class with explanation:public class WordsProducer { public static void main(String[] args) { // Build the configuration required for connecting to Kafka Properties props = new Properties(); //List of Kafka brokers. Complete list of brokers is not //required as the producer will auto discover the rest of //the brokers. Change this to suit your deployment. props.put("metadata.broker.list", "localhost:9092"); // Serializer used for sending data to kafka. Since we are sending string, // we are using StringEncoder. props.put("serializer.class", "kafka.serializer.StringEncoder"); // We want acks from Kafka that messages are properly received. props.put("request.required.acks", "1"); // Create the producer instance ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); // Now we break each word from the paragraph for (String word : METAMORPHOSIS_OPENING_PARA.split("\s")) { // Create message to be sent to "words_topic" topic with the word KeyedMessage<String, String> data =new KeyedMessage<String, String>("words_topic", word); // Send the message producer.send(data); } System.out.println("Produced data"); // close the producer producer.close(); } // First paragraph from Franz Kafka's Metamorphosis private static String METAMORPHOSIS_OPENING_PARA = "One morning, when Gregor Samsa woke from troubled dreams, " + "he found himself transformed in his bed into a horrible " + "vermin. He lay on his armour-like back, and if he lifted " + "his head a little he could see his brown belly, slightly " + "domed and divided by arches into stiff sections."; }
mvn compile exec:java
The following output is displayed:
Produced data
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic words_topic --from-beginning
The following output is displayed:
One morning, when Gregor Samsa woke from troubled dreams, he found himself transformed in his bed into a horrible vermin.
So, we are able to produce messages into Kafka. In the next section, we will see how we can use KafkaSpout
to read messages from Kafka and process them inside a Storm topology.
3.138.105.255