This section will explain how you can persist the process data to the data store. We are using MySQL as the data store for storing the processed data in this use case.
We will assume that you have MySQL installed on your CentOS machine, or you can follow the blog at http://www.rackspace.com/knowledge_center/article/installing-mysql-server-on-centos to install MySQL on a CentOS machine. Let's perform the following steps to persist records to MySQL:
pom.xml
file of the stormlogprocessing
project:<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency>
MySQLConnection
class in the com.learningstorm.stormlogprocessing
package. This class contains the getMySQLConnection(String ip, String database, String user, String password)
function, which returns the MySQL connection. The following is the source code of the MySQLConnection
class:/** * This class returns the MySQL connection. */ public class MySQLConnection { private static Connection connect = null; /** * This method returns the MySQL connection. * * @param ip * IP address of the MySQL server * @param database * name of the database * @param user * name of the user * @param password * password of the given user * @return MySQL connection */ public static Connection getMySQLConnection(String ip, String database, String user, String password) { try { // This will load the MySQL driver, // each DB has its own driver Class.forName("com.mysql.jdbc.Driver"); // Set up the connection with the DB. connect = DriverManager.getConnection("jdbc:mysql://"+ ip +"/"+database+"?"+"user="+user+"&password="+password+""); return connect; } catch (Exception e) { throw new RuntimeException("Error occurred while getting the MySQL connection: "); } } }
MySQLDump
class in the com.learningstorm.stormlogprocessing
package. This class has a parameterized constructor that will take the IP address, the database name, the user name, and the password of the MySQL server as arguments. This class calls the getMySQLConnection(ip,database,user,password)
method of the MySQLConnection
class to get the MySQL connection. The MySQLDump
class contains the persistRecord(Tuple tuple
) method, and this method persists the tuples into MySQL. The following is the source code of the MySQLDump
class:/** * This class contains logic to persist the record * into the MySQL database. */ public class MySQLDump { /** * Name of database you want to connect */ private String database; /** * Name of MySQL user */ private String user; /** * IP of the MySQL server */ private String ip; /** * Password of the MySQL server */ private String password; public MySQLDump(String ip, String database, String user, String password) { this.ip = ip; this.database = database; this.user = user; this.password = password; } /** * Get the MySQL connection */ private Connection connect = MySQLConnection.getMySQLConnection(ip,database,user,password); private PreparedStatement preparedStatement = null; /** * Persist input tuple. * @param tuple */ public void persistRecord(Tuple tuple) { try { // preparedStatements can use variables and // are more efficient preparedStatement = connect.prepareStatement("insert into apachelog values (default, ?, ?, ?,?, ?, ?, ?, ? , ?, ?, ?)"); preparedStatement.setString(1, tuple. getStringByField("ip")); preparedStatement.setString(2, tuple.getStringByField("dateTime")); preparedStatement.setString(3, tuple.getStringByField("request")); preparedStatement.setString(4, tuple.getStringByField("response")); preparedStatement.setString(5, tuple.getStringByField("bytesSent")); preparedStatement.setString(6, tuple.getStringByField("referrer")); preparedStatement.setString(7, tuple.getStringByField("useragent")); preparedStatement.setString(8, tuple.getStringByField("country")); preparedStatement.setString(9, tuple.getStringByField("browser")); preparedStatement.setString(10, tuple.getStringByField("os")); preparedStatement.setString(11, tuple.getStringByField("keyword")); // Insert record preparedStatement.executeUpdate(); } catch (Exception e) { throw new RuntimeException("Error occurred while persisting records in MySQL: "); } finally { // close prepared statement if (preparedStatement != null) { try { preparedStatement.close(); } catch (Exception exception) { System.out.println("Error occurred while closing PreparedStatement:"); } } } } public void close() { try { connect.close(); }catch(Exception exception) { System.out.println("Error occurred while closing the connection"); } } }
PersistenceBolt
class in the com.learningstorm.stormlogprocessing
package. This class implements the bolt, backtype.storm.topology.IBasicBolt
. The PersistenceBolt
class has a parameterized constructor that will take the IP address, the database name, the user name, and password of the MySQL server as arguments. The execute()
method of the PersistenceBolt
class calls the persistRecord(Tuple tuple)
method of the MySQLDump
class to persist the record into MySQL. The following is the source code of the PersistenceBolt
class:/** * This Bolt calls the getConnectionn(....) method * of the MySQLDump class to persist * the record into the MySQL database. * * @author Admin * */ public class PersistenceBolt implements IBasicBolt { private MySQLDump mySQLDump = null; private static final long serialVersionUID = 1L; /** * Name of the database you want to connect */ private String database; /** * Name of the MySQL user */ private String user; /** * IP address of the MySQL server */ private String ip; /** * Password of the MySQL server */ private String password; public PersistenceBolt(String ip, String database, String user, String password) { this.ip = ip; this.database = database; this.user = user; this.password = password; } public void declareOutputFields(OutputFieldsDeclarer declarer) { } public Map<String, Object> getComponentConfiguration() { return null; } public void prepare(Map stormConf, TopologyContext context) { // create the instance of the MySQLDump(....) class. mySQLDump = new MySQLDump(ip, database, user, password); } /** * This method calls the persistRecord(input) method * of the MySQLDump class to persist records into MySQL. */ public void execute(Tuple input, BasicOutputCollector collector) { System.out.println("Input tuple : " + input); mySQLDump.persistRecord(input); } public void cleanup() { // Close the connection mySQLDump.close(); } }
In this section, we covered how to insert the input tuples into the data store.
3.143.239.234