Persisting the process data

This section will explain how you can persist the process data to the data store. We are using MySQL as the data store for storing the processed data in this use case.

We will assume that you have MySQL installed on your CentOS machine, or you can follow the blog at http://www.rackspace.com/knowledge_center/article/installing-mysql-server-on-centos to install MySQL on a CentOS machine. Let's perform the following steps to persist records to MySQL:

  1. Add the following dependency to the pom.xml file of the stormlogprocessing project:
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.6</version>
        </dependency>
  2. We will create a MySQLConnection class in the com.learningstorm.stormlogprocessing package. This class contains the getMySQLConnection(String ip, String database, String user, String password) function, which returns the MySQL connection. The following is the source code of the MySQLConnection class:
    /**
     * This class returns the MySQL connection.
     */
    public class MySQLConnection {
    
      private static Connection connect = null;
    
      /**
       * This method returns the MySQL connection.
       * 
       * @param ip
       *            IP address of the MySQL server
       * @param database
       *            name of the database
       * @param user
       *            name of the user
       * @param password
       *            password of the given user
       * @return MySQL connection
       */
      public static Connection getMySQLConnection(String ip, String database, String user, String password) {
        try {
          // This will load the MySQL driver,
          // each DB has its own driver
          Class.forName("com.mysql.jdbc.Driver");
          // Set up the connection with the DB.
          connect = DriverManager.getConnection("jdbc:mysql://"+ ip +"/"+database+"?"+"user="+user+"&password="+password+"");
          return connect;
        } catch (Exception e) {
          throw new RuntimeException("Error occurred while getting the MySQL connection: ");
        }
      }
    }
  3. Now, we will create a MySQLDump class in the com.learningstorm.stormlogprocessing package. This class has a parameterized constructor that will take the IP address, the database name, the user name, and the password of the MySQL server as arguments. This class calls the getMySQLConnection(ip,database,user,password) method of the MySQLConnection class to get the MySQL connection. The MySQLDump class contains the persistRecord(Tuple tuple) method, and this method persists the tuples into MySQL. The following is the source code of the MySQLDump class:
    /**
     * This class contains logic to persist the record
     * into the MySQL database.
    */
    public class MySQLDump {
      /**
       * Name of database you want to connect
       */
      private String database;
      /**
       * Name of MySQL user
       */
      private String user;
      /**
       * IP of the MySQL server
       */
      private String ip;
      /**
       * Password of the MySQL server
       */
      private String password;
      
      public MySQLDump(String ip, String database, String user, String password) {
        this.ip = ip;
        this.database = database;
        this.user = user;
        this.password = password;
      }
      
      /**
       * Get the MySQL connection
       */
      private Connection connect = MySQLConnection.getMySQLConnection(ip,database,user,password);
    
      private PreparedStatement preparedStatement = null;
      
      /**
       * Persist input tuple.
       * @param tuple
       */
      public void persistRecord(Tuple tuple) {
        try {
    
          // preparedStatements can use variables and
          // are more efficient
          preparedStatement = connect.prepareStatement("insert into  apachelog values (default, ?, ?, ?,?, ?, ?, ?, ? , ?, ?, ?)");
    
          preparedStatement.setString(1, tuple. getStringByField("ip"));
          preparedStatement.setString(2, tuple.getStringByField("dateTime"));
          preparedStatement.setString(3, tuple.getStringByField("request"));
          preparedStatement.setString(4, tuple.getStringByField("response"));
          preparedStatement.setString(5, tuple.getStringByField("bytesSent"));
          preparedStatement.setString(6, tuple.getStringByField("referrer"));
          preparedStatement.setString(7, tuple.getStringByField("useragent"));
          preparedStatement.setString(8, tuple.getStringByField("country"));
          preparedStatement.setString(9, tuple.getStringByField("browser"));
          preparedStatement.setString(10, tuple.getStringByField("os"));
          preparedStatement.setString(11, tuple.getStringByField("keyword"));
          
          // Insert record
          preparedStatement.executeUpdate();
    
        } catch (Exception e) {
          throw new RuntimeException("Error occurred while persisting records in MySQL: ");
        } finally {
          // close prepared statement
          if (preparedStatement != null) {
            try {
              preparedStatement.close();
            } catch (Exception exception) {
              System.out.println("Error occurred while closing PreparedStatement:");
            }
          }
        }
    
      }
      public void close() {
        try {
        connect.close();
        }catch(Exception exception) {
          System.out.println("Error occurred while closing the connection");
        }
      }
    }
  4. Let's create a PersistenceBolt class in the com.learningstorm.stormlogprocessing package. This class implements the bolt, backtype.storm.topology.IBasicBolt. The PersistenceBolt class has a parameterized constructor that will take the IP address, the database name, the user name, and password of the MySQL server as arguments. The execute() method of the PersistenceBolt class calls the persistRecord(Tuple tuple) method of the MySQLDump class to persist the record into MySQL. The following is the source code of the PersistenceBolt class:
    /**
     * This Bolt calls the getConnectionn(....) method
     * of the MySQLDump class to persist
     * the record into the MySQL database.
     * 
     * @author Admin
     * 
     */
    public class PersistenceBolt implements IBasicBolt {
    
      private MySQLDump mySQLDump = null;
      private static final long serialVersionUID = 1L;
      /**
       * Name of the database you want to connect
       */
      private String database;
      /**
       * Name of the MySQL user
       */
      private String user;
      /**
       * IP address of the MySQL server
       */
      private String ip;
      /**
       * Password of the MySQL server
       */
      private String password;
    
      public PersistenceBolt(String ip, String database, String user, String password) {
        this.ip = ip;
        this.database = database;
        this.user = user;
        this.password = password;
      }
    
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
      }
    
      public Map<String, Object> getComponentConfiguration() {
        return null;
      }
    
      public void prepare(Map stormConf, TopologyContext context) {
    
        // create the instance of the MySQLDump(....) class.
        mySQLDump = new MySQLDump(ip, database, user, password);
      }
      /**
       * This method calls the persistRecord(input) method
       * of the MySQLDump class to persist records into MySQL.
       */
      public void execute(Tuple input, BasicOutputCollector collector) {
        System.out.println("Input tuple : " + input);
        mySQLDump.persistRecord(input);
      }
    
      public void cleanup() {
        // Close the connection
        mySQLDump.close();
      }
    
    }

In this section, we covered how to insert the input tuples into the data store.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.239.234