Integrating Kafka with Storm

Now, we will create a Storm topology that will consume messages from a Kafka topic, word_topic, and aggregate words into sentences.

The complete message flow is shown in the following diagram:

Integrating Kafka with Storm

The message flow in the example Storm-Kafka integration

We have already seen the WordsProducer class that produces words into the Kafka broker. Now, we will create a Storm topology that will read these words from Kafka and aggregate them into sentences. For this, we will have one KafkaSpout in the application that will read the messages from Kafka and two bolts: SentenceBolt, which receives words from KafkaSpout and then aggregates them into sentences which are then passed onto PrinterBolt, which simply prints them on the output stream. We will be running this topology in a local mode. Perform the following steps to create the Storm topology:

  1. Create a new Maven project with the com.learningstorm group ID and the kafka-storm-topology artifact ID.
  2. Add the following dependencies for KafkaSpout and Storm in the pom.xml file:
    <!-- Dependency for Storm-Kafka spout  -->
    
    <dependency>
      <groupId>net.wurstmeister.storm</groupId>
      <artifactId>storm-kafka-0.8-plus</artifactId>
      <version>0.4.0</version>
    </dependency>
    
    <!-- Dependency for Storm -->
    
    <dependency>
      <groupId>storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>0.9.0.1</version>
      </dependency>
    
    <!-- Utilities -->
    
    <dependency>
      <groupId>commons-collections</groupId>
      <artifactId>commons-collections</artifactId>
      <version>3.2.1</version>
      </dependency>
    
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>15.0</version>
    </dependency>
  3. Add the exec-maven-plugin plugin to the pom.xml file so that we are able to run the topology from the command line in a local mode using the following code:
    <plugin>
      <groupId>org.codehaus.mojo</groupId>
      <artifactId>exec-maven-plugin</artifactId>
      <version>1.2.1</version>
      <executions>
        <execution>
          <goals>
            <goal>exec</goal>
          </goals>
        </execution>
      </executions>
      <configuration>
        <executable>java</executable>
        <includeProjectDependencies>true</includeProjectDependencies>
        <includePluginDependencies>false</includePluginDependencies>
        <classpathScope>compile</classpathScope>
        <mainClass>${main.class}</mainClass>
      </configuration>
    </plugin>
  4. Add the maven-assembly-plugin plugin to the pom.xml file so that we can package the topology to deploy it on Storm using the following code:
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <mainClass></mainClass>
          </manifest>
        </archive>
        </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>single</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  5. Now, add the repositories for the KafkaSpout dependencies in the pom.xml file:
    <repositories>
      <repository>
        <id>github-releases</id>
        <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
      </repository>
      <repository>
        <id>clojars.org</id>
        <url>http://clojars.org/repo</url>
      </repository>
    </repositories>
  6. Now, we will first create SentenceBolt, which will aggregate the words into sentences. For this, create a class called SentenceBolt in the com.learningstorm.kafka package. The following is the code for the SentenceBolt class with explanation:
    public class SentenceBolt extends BaseBasicBolt {
      // list used for aggregating the words
      private List<String> words = new ArrayList<String>();
      public void execute(Tuple input, BasicOutputCollector collector) {
        // Get the word from the tuple
        String word = input.getString(0);
        if(StringUtils.isBlank(word)){
          // ignore blank lines
          return;
        }
        System.out.println("Received Word:" + word);
        // add word to current list of words
        words.add(word);
        if (word.endsWith(".")) {
          // word ends with '.' which means this is the end
          // the SentenceBolt publishes a sentence tuple
          collector.emit(ImmutableList.of((Object) StringUtils.join(words, ' ')));
    
        // and reset the words list.
        words.clear();
        }
      }
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // here we declare we will be emitting tuples with
        // a single field called "sentence"
        declarer.declare(new Fields("sentence"));
      }
    }
  7. Next is PrinterBolt, which just prints the sentences that are received. Create the PrinterBolt class in the com.learningstorm.kafka package. The following is the code with explanation:
    public class PrinterBolt extends BaseBasicBolt {
      public void execute(Tuple input, BasicOutputCollector collector) {
        // get the sentence from the tuple and print it
        String sentence = input.getString(0);
        System.out.println("Received Sentence: " + sentence);
      }
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // we don't emit anything
      }
    }
  8. Now, we will create KafkaTopology, which will define KafkaSpout and wire it with PrinterBolt and SentenceBolt. Create a new KafkaTopology class in the com.learningstorm.kafka package. The following is the code with explanation:
    public class KafkaTopology {
      public static void main(String[] args) throws
      AlreadyAliveException, InvalidTopologyException {
        // zookeeper hosts for the Kafka cluster
        ZkHosts zkHosts = new ZkHosts("localhost:2181");
        // Create the KafkaSpout configuration
        // Second argument is the topic name
        // Third argument is the ZooKeeper root for Kafka
        // Fourth argument is consumer group id
        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "words_topic", "", "id7");
        // Specify that the kafka messages are String
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        // We want to consume all the first messages in
        // the topic every time we run the topology to 
        // help in debugging. In production, this
        // property should be false
        kafkaConfig.forceFromStart = true;
        // Now we create the topology
        TopologyBuilder builder = new TopologyBuilder();
        // set the kafka spout class
        builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1); 
        // configure the bolts
        builder.setBolt("SentenceBolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout");	
        builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt");
        // create an instance of LocalCluster class 
        // for executing topology in local mode.
        LocalCluster cluster = new LocalCluster();
        Config conf = new Config();
        // Submit topology for execution
        cluster.submitTopology("KafkaToplogy", conf, builder.createTopology());
        try {
          // Wait for some time before exiting
          System.out.println("Waiting to consume from kafka");
          Thread.sleep(10000);
        } catch (Exception exception) {
          System.out.println("Thread interrupted exception : " + exception);
        } 
        // kill the KafkaTopology
        cluster.killTopology("KafkaToplogy");
        // shut down the storm test cluster
        cluster.shutdown();
      }
    }
  9. Now, we will run the topology. Make sure the Kafka cluster is running and you have executed the producer in the last section so that there are messages in Kafka for consumption.

    Run the topology by executing the following command:

    mvn clean compile exec:java  -Dmain.class=com.learningstorm.kafka.KafkaTopology
    

    This will execute the topology. You should see messages similar to the following output:

    RecievedWord:One
    RecievedWord:morning,
    RecievedWord:when
    RecievedWord:Gregor
    RecievedWord:Samsa
    RecievedWord:woke
    RecievedWord:from
    RecievedWord:troubled
    RecievedWord:dreams,
    RecievedWord:he
    RecievedWord:found
    RecievedWord:himself
    RecievedWord:transformed
    RecievedWord:in
    RecievedWord:his
    RecievedWord:bed
    RecievedWord:into
    RecievedWord:a
    RecievedWord:horrible
    RecievedWord:vermin
    RecievedSentence:One morning, when Gregor Samsa woke from troubled dreams, he found himself transformed in his bed into a horrible vermin.
    

So, we were able to consume messages from Kafka and process them in a Storm topology.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.159.82