As explained in Chapter 4, Storm and Kafka Integration, Kafka is a distributed messaging queue and can integrate with Storm very well. In this section, you'll see how to write a Kafka producer that will read the server logfile and produce the log in Kafka.
As we all know, Storm provides guaranteed message processing, which means every message that enters the Storm topology will be processed at least once. In Storm, data loss is possible only at the spout. This happens if the processing capacity of the Storm spout is less than the producing capacity of the data publisher. Hence, to avoid data loss at the Storm spout, we will generally publish the data into a messaging queue, and the Storm spout will use that messaging queue as the data source.
We will create a Maven project that will publish the server log into a Kafka broker. Perform the following steps to create the server log producer:
com.learningstorm
for groupId
and kafkaLogProducer
for artifactId
.pom.xml
:<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> <exclusions> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.0-beta9</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-1.2-api</artifactId> <version>2.0-beta9</version> </dependency>
pom.xml
. These plugins will let us execute the producer using Maven:<build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>com.learningstorm.kafka.WordsProducer</mainClass> </configuration> </plugin> </plugins> </build>
ApacheLogProducer
class in the com.learningstorm.kafkaLogProducer
package. This class will read the server logfile and produce each log line in the apache_log
topic in Kafka as a single message. The following is the code for the ApacheLogProducer
class with its explanation:public class KafkaProducer { public static void main(String[] args) { // Build the configuration required // for connecting to Kafka Properties props = new Properties(); // List of kafka brokers. // The complete list of brokers is not required as // the producer will auto discover //the rest of the brokers. props.put("metadata.broker.list", "localhost:9092"); // Serializer used for sending data to kafka. // Since we are sending string, // we are using StringEncoder. props.put("serializer.class", "kafka.serializer.StringEncoder"); // We want acknowledgement from Kafka that // the messages have been properly received. props.put("request.required.acks", "1"); // Create the producer instance ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); try { FileInputStream fstream = new FileInputStream("./src/main/resources/apache_test.log"); BufferedReader br = new BufferedReader(new InputStreamReader(fstream)); String strLine; /* read log line by line */ while ((strLine = br.readLine()) != null) { KeyedMessage<String, String> data = new KeyedMessage<String, String>("apache_log", strLine); producer.send(data); } br.close(); fstream.close(); }catch (Exception e) { throw new RuntimeException("Error occurred while persisting records : "); } // close the producer producer.close(); } }
Replace localhost
of the preceding ApacheLogProducer
class with the IP address of the broker machine.
Also, replace ./src/main/resources/apache_test.log
(the server log path) with the path of your logfile.
ApacheLogProducer
class will directly produce the log data in the apache_log
topic in Kafka. Hence, you need to create the apache_log
topic in Kafka before you run the ApacheLogProducer
producer. To do so, go to the home directory of Kafka and execute the following command:bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic apache_log creation succeeded!
ApacheLogProducer
by executing the following Maven command. The ApacheLogProducer
needs to be run on a machine where the server logs are generated:mvn compile exec:java
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic apache_log --from-beginning
The following information is displayed:
4.19.162.143 - - [4-03-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7" 4.19.162.152 - - [4-03-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7" 4.20.73.15 - - [4-03-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7" 4.20.73.32 - - [4-03-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"
3.143.235.23