This section will explain how you can read the server log from a Kafka topic. We will use the Kafka spout integration available on GitHub at https://github.com/wurstmeister/storm-kafka-0.8-plus for consuming the data from Kafka. This section also defines the LogProcessingTopology
topology that will chain together all the bolts created in the preceding sections. Let's perform the following steps to consume the data from Kafka and define a topology:
pom.xml
:<dependency> <groupId>net.wurstmeister.storm</groupId> <artifactId>storm-kafka-0.8-plus</artifactId> <version>0.4.0</version> </dependency>
pom.xml
. These plugins will let us execute LogProcessingTopology
using Maven:<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.5.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies </descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass /> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
LogProcessingTopology
class in the com.learningstorm.stormlogprocessing
package. This class uses the backtype.storm.topology.TopologyBuilder
class to define the topology. The following is the source code of the LogProcessingTopology
class with its explanation:public class LogProcessingTopology { public static void main(String[] args) throws Exception { // zookeeper hosts for the Kafka cluster ZkHosts zkHosts = new ZkHosts("localhost:2181"); // Create the KafkaSpout configuartion // Second argument is the topic name // Third argument is the zookeeper root for Kafka // Fourth argument is consumer group id SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "apache_log", "", "id"); // Specify that the kafka messages are String kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // We want to consume all the first messages // in the topic every time we run the topology // to help in debugging. In production, this // property should be false kafkaConfig.forceFromStart = true; // Now we create the topology TopologyBuilder builder = new TopologyBuilder(); // set the kafka spout class builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1); // set the LogSplitter, IpToCountry, Keyword, // and PersistenceBolt bolts // class. builder.setBolt("LogSplitter", new ApacheLogSplitterBolt(), 1) .globalGrouping("KafkaSpout"); builder.setBolt("IpToCountry", new UserInformationGetterBolt("./src/main/resources/GeoLiteCity.dat"), 1) .globalGrouping("LogSplitter"); builder.setBolt("Keyword", new KeyWordIdentifierBolt(), 1).globalGrouping("IpToCountry"); builder.setBolt("PersistenceBolt", new PersistenceBolt("localhost", "apachelog", "root", "root"), 1).globalGrouping("Keyword"); if (args != null && args.length > 0) { // Run the topology on remote cluster. Config conf = new Config(); conf.setNumWorkers(4); try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException alreadyAliveException) { System.out.println(alreadyAliveException); } catch (InvalidTopologyException invalidTopologyException) { System.out.println(invalidTopologyException); } } else { // create an instance of the LocalCluster class // for executing the topology in the local mode. LocalCluster cluster = new LocalCluster(); Config conf = new Config(); // Submit topology for execution cluster.submitTopology("KafkaToplogy", conf, builder.createTopology()); try { // Wait for some time before exiting System.out.println("**********************Waiting to consume from kafka"); Thread.sleep(10000); } catch (Exception exception) { System.out.println("******************Thread interrupted exception : " + exception); } // kill KafkaTopology cluster.killTopology("KafkaToplogy"); // shut down the storm test cluster cluster.shutdown(); } } }
This section covered how to chain the different types of bolts into a topology. In addition to this, we covered how to consume the data from Kafka. In the next section, we will learn how to deploy the topology.
3.138.105.255