Producing the Apache log in Kafka

As explained in Chapter 4, Storm and Kafka Integration, Kafka is a distributed messaging queue and can integrate with Storm very well. In this section, you'll see how to write a Kafka producer that will read the server logfile and produce the log in Kafka.

As we all know, Storm provides guaranteed message processing, which means every message that enters the Storm topology will be processed at least once. In Storm, data loss is possible only at the spout. This happens if the processing capacity of the Storm spout is less than the producing capacity of the data publisher. Hence, to avoid data loss at the Storm spout, we will generally publish the data into a messaging queue, and the Storm spout will use that messaging queue as the data source.

We will create a Maven project that will publish the server log into a Kafka broker. Perform the following steps to create the server log producer:

  1. Create a new Maven project with com.learningstorm for groupId and kafkaLogProducer for artifactId.
  2. Add the following dependencies for Kafka in pom.xml:
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.8.0</version>
      <exclusions>
        <exclusion>
          <groupId>com.sun.jdmk</groupId>
          <artifactId>jmxtools</artifactId>
        </exclusion>
        <exclusion>
          <groupId>com.sun.jmx</groupId>
          <artifactId>jmxri</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>2.0-beta9</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-1.2-api</artifactId>
      <version>2.0-beta9</version>
    </dependency>
  3. Add the following build plugins to pom.xml. These plugins will let us execute the producer using Maven:
    <build>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>1.2.1</version>
          <executions>
            <execution>
              <goals>
                <goal>exec</goal>
              </goals>
            </execution>
          </executions>
          <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>com.learningstorm.kafka.WordsProducer</mainClass>
          </configuration>
        </plugin>
      </plugins>
    </build>
  4. Now, we will create the ApacheLogProducer class in the com.learningstorm.kafkaLogProducer package. This class will read the server logfile and produce each log line in the apache_log topic in Kafka as a single message. The following is the code for the ApacheLogProducer class with its explanation:
    public class KafkaProducer {
      
      public static void main(String[] args) {
        // Build the configuration required
        // for connecting to Kafka
        Properties props = new Properties();
    
        // List of kafka brokers.
        // The complete list of brokers is not required as
        // the producer will auto discover
        //the rest of the brokers.
        props.put("metadata.broker.list", "localhost:9092");
    
        // Serializer used for sending data to kafka.
        // Since we are sending string,
        // we are using StringEncoder.
        props.put("serializer.class", "kafka.serializer.StringEncoder");
    
        // We want acknowledgement from Kafka that
        // the messages have been properly received.
        props.put("request.required.acks", "1");
    
        // Create the producer instance
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        try {
        FileInputStream fstream = new FileInputStream("./src/main/resources/apache_test.log");
        BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
        String strLine;
        /* read log line by line */
        while ((strLine = br.readLine()) != null) {
          KeyedMessage<String, String> data = new KeyedMessage<String, String>("apache_log", strLine);
           producer.send(data);
        }
        br.close();
        fstream.close();
        }catch (Exception e) {
          throw new RuntimeException("Error occurred while persisting records : ");
        }
    
        // close the producer
        producer.close();
      }
      
    }

    Replace localhost of the preceding ApacheLogProducer class with the IP address of the broker machine.

    Also, replace ./src/main/resources/apache_test.log (the server log path) with the path of your logfile.

  5. The preceding ApacheLogProducer class will directly produce the log data in the apache_log topic in Kafka. Hence, you need to create the apache_log topic in Kafka before you run the ApacheLogProducer producer. To do so, go to the home directory of Kafka and execute the following command:
    bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic apache_log
    creation succeeded!
    
  6. Now, you can run ApacheLogProducer by executing the following Maven command. The ApacheLogProducer needs to be run on a machine where the server logs are generated:
    mvn compile exec:java
    
  7. Now, run the Kafka console consumer to check whether the messages are successfully produced in Kafka. Run the following command to start the Kafka console consumer:
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic apache_log --from-beginning
    

    The following information is displayed:

    4.19.162.143 - - [4-03-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"
    4.19.162.152 - - [4-03-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"
    4.20.73.15 - - [4-03-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"
    4.20.73.32 - - [4-03-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"
    
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.235.23