Defining a topology and the Kafka spout

This section will explain how you can read the server log from a Kafka topic. We will use the Kafka spout integration available on GitHub at https://github.com/wurstmeister/storm-kafka-0.8-plus for consuming the data from Kafka. This section also defines the LogProcessingTopology topology that will chain together all the bolts created in the preceding sections. Let's perform the following steps to consume the data from Kafka and define a topology:

  1. Add the following dependency and repository for Kafka in pom.xml:
        <dependency>
          <groupId>net.wurstmeister.storm</groupId>
          <artifactId>storm-kafka-0.8-plus</artifactId>
          <version>0.4.0</version>
        </dependency>
  2. Add the following build plugins to pom.xml. These plugins will let us execute LogProcessingTopology using Maven:
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.5.1</version>
            <configuration>
              <source>1.6</source>
              <target>1.6</target>
            </configuration>
          </plugin>
    
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.2.1</version>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies
                </descriptorRef>
              </descriptorRefs>
              <archive>
                <manifest>
                  <mainClass />
                </manifest>
              </archive>
            </configuration>
            <executions>
              <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
  3. Let's create a LogProcessingTopology class in the com.learningstorm.stormlogprocessing package. This class uses the backtype.storm.topology.TopologyBuilder class to define the topology. The following is the source code of the LogProcessingTopology class with its explanation:
    public class LogProcessingTopology {
      public static void main(String[] args) throws Exception {
    
        // zookeeper hosts for the Kafka cluster
        ZkHosts zkHosts = new ZkHosts("localhost:2181");
    
        // Create the KafkaSpout configuartion
        // Second argument is the topic name
        // Third argument is the zookeeper root for Kafka
        // Fourth argument is consumer group id
        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "apache_log", "", "id");
    
        // Specify that the kafka messages are String
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
        // We want to consume all the first messages
        // in the topic every time we run the topology
        // to help in debugging. In production, this
        // property should be false
        kafkaConfig.forceFromStart = true;
    
        // Now we create the topology
        TopologyBuilder builder = new TopologyBuilder();
    
        // set the kafka spout class
        builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
    
        // set the LogSplitter, IpToCountry, Keyword,
        // and PersistenceBolt bolts
        // class.
        builder.setBolt("LogSplitter", new ApacheLogSplitterBolt(), 1)
            .globalGrouping("KafkaSpout");
        builder.setBolt("IpToCountry", new UserInformationGetterBolt("./src/main/resources/GeoLiteCity.dat"), 1)
            .globalGrouping("LogSplitter");
        builder.setBolt("Keyword", new KeyWordIdentifierBolt(), 1).globalGrouping("IpToCountry");
        builder.setBolt("PersistenceBolt", new PersistenceBolt("localhost", "apachelog", "root", "root"), 1).globalGrouping("Keyword");
    
        if (args != null && args.length > 0) {
          // Run the topology on remote cluster.
          Config conf = new Config();
          conf.setNumWorkers(4);
          try {
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
          } catch (AlreadyAliveException alreadyAliveException) {
            System.out.println(alreadyAliveException);
          } catch (InvalidTopologyException invalidTopologyException) {
            System.out.println(invalidTopologyException);
          }
        } else {
          // create an instance of the LocalCluster class
          // for executing the topology in the local mode.
          LocalCluster cluster = new LocalCluster();
          Config conf = new Config();
    
          // Submit topology for execution
          cluster.submitTopology("KafkaToplogy", conf, builder.createTopology());
    
          try {
            // Wait for some time before exiting
            System.out.println("**********************Waiting to consume from kafka");
            Thread.sleep(10000);
    
          } catch (Exception exception) {
            System.out.println("******************Thread interrupted exception : " + exception);
          }
    
          // kill KafkaTopology
          cluster.killTopology("KafkaToplogy");
    
          // shut down the storm test cluster
          cluster.shutdown();
    
        }
    
      }
    }

    This section covered how to chain the different types of bolts into a topology. In addition to this, we covered how to consume the data from Kafka. In the next section, we will learn how to deploy the topology.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.105.255