Now, we will create a Storm topology that will consume messages from a Kafka topic, word_topic
, and aggregate words into sentences.
The complete message flow is shown in the following diagram:
We have already seen the WordsProducer
class that produces words into the Kafka broker. Now, we will create a Storm topology that will read these words from Kafka and aggregate them into sentences. For this, we will have one KafkaSpout
in the application that will read the messages from Kafka and two bolts: SentenceBolt
, which receives words from KafkaSpout
and then aggregates them into sentences which are then passed onto PrinterBolt
, which simply prints them on the output stream. We will be running this topology in a local mode. Perform the following steps to create the Storm topology:
com.learningstorm
group ID and the kafka-storm-topology
artifact ID.KafkaSpout
and Storm in the pom.xml
file:<!-- Dependency for Storm-Kafka spout --> <dependency> <groupId>net.wurstmeister.storm</groupId> <artifactId>storm-kafka-0.8-plus</artifactId> <version>0.4.0</version> </dependency> <!-- Dependency for Storm --> <dependency> <groupId>storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.0.1</version> </dependency> <!-- Utilities --> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>15.0</version> </dependency>
exec-maven-plugin
plugin to the pom.xml
file so that we are able to run the topology from the command line in a local mode using the following code:<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>${main.class}</mainClass> </configuration> </plugin>
maven-assembly-plugin
plugin to the pom.xml
file so that we can package the topology to deploy it on Storm using the following code:<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
KafkaSpout
dependencies in the pom.xml
file:<repositories> <repository> <id>github-releases</id> <url>http://oss.sonatype.org/content/repositories/github-releases/</url> </repository> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories>
SentenceBolt
, which will aggregate the words into sentences. For this, create a class called SentenceBolt
in the com.learningstorm.kafka
package. The following is the code for the SentenceBolt
class with explanation:public class SentenceBolt extends BaseBasicBolt { // list used for aggregating the words private List<String> words = new ArrayList<String>(); public void execute(Tuple input, BasicOutputCollector collector) { // Get the word from the tuple String word = input.getString(0); if(StringUtils.isBlank(word)){ // ignore blank lines return; } System.out.println("Received Word:" + word); // add word to current list of words words.add(word); if (word.endsWith(".")) { // word ends with '.' which means this is the end // the SentenceBolt publishes a sentence tuple collector.emit(ImmutableList.of((Object) StringUtils.join(words, ' '))); // and reset the words list. words.clear(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // here we declare we will be emitting tuples with // a single field called "sentence" declarer.declare(new Fields("sentence")); } }
PrinterBolt
, which just prints the sentences that are received. Create the PrinterBolt
class in the com.learningstorm.kafka
package. The following is the code with explanation:public class PrinterBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { // get the sentence from the tuple and print it String sentence = input.getString(0); System.out.println("Received Sentence: " + sentence); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // we don't emit anything } }
KafkaTopology
, which will define KafkaSpout
and wire it with PrinterBolt
and SentenceBolt
. Create a new KafkaTopology
class in the com.learningstorm.kafka
package. The following is the code with explanation:public class KafkaTopology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { // zookeeper hosts for the Kafka cluster ZkHosts zkHosts = new ZkHosts("localhost:2181"); // Create the KafkaSpout configuration // Second argument is the topic name // Third argument is the ZooKeeper root for Kafka // Fourth argument is consumer group id SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "words_topic", "", "id7"); // Specify that the kafka messages are String kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // We want to consume all the first messages in // the topic every time we run the topology to // help in debugging. In production, this // property should be false kafkaConfig.forceFromStart = true; // Now we create the topology TopologyBuilder builder = new TopologyBuilder(); // set the kafka spout class builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1); // configure the bolts builder.setBolt("SentenceBolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout"); builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt"); // create an instance of LocalCluster class // for executing topology in local mode. LocalCluster cluster = new LocalCluster(); Config conf = new Config(); // Submit topology for execution cluster.submitTopology("KafkaToplogy", conf, builder.createTopology()); try { // Wait for some time before exiting System.out.println("Waiting to consume from kafka"); Thread.sleep(10000); } catch (Exception exception) { System.out.println("Thread interrupted exception : " + exception); } // kill the KafkaTopology cluster.killTopology("KafkaToplogy"); // shut down the storm test cluster cluster.shutdown(); } }
Run the topology by executing the following command:
mvn clean compile exec:java -Dmain.class=com.learningstorm.kafka.KafkaTopology
This will execute the topology. You should see messages similar to the following output:
RecievedWord:One RecievedWord:morning, RecievedWord:when RecievedWord:Gregor RecievedWord:Samsa RecievedWord:woke RecievedWord:from RecievedWord:troubled RecievedWord:dreams, RecievedWord:he RecievedWord:found RecievedWord:himself RecievedWord:transformed RecievedWord:in RecievedWord:his RecievedWord:bed RecievedWord:into RecievedWord:a RecievedWord:horrible RecievedWord:vermin RecievedSentence:One morning, when Gregor Samsa woke from troubled dreams, he found himself transformed in his bed into a horrible vermin.
So, we were able to consume messages from Kafka and process them in a Storm topology.
18.189.178.237