Redis is a key value data store. The key values can be strings, lists, sets, hashes, and so on. It is extremely fast because the entire dataset is stored in the memory. The following are the steps to install Redis:
make
, gcc
, and cc
to compile the Redis code using the following command:sudo yum -y install make gcc cc
/usr/local/bin
using the following commands:cd /home/$USER
Here, $USER
is the name of the Linux user.
http://download.redis.io/releases/redis-2.6.16.tar.gz tar -xvf redis-2.6.16.tar.gz cd redis-2.6.16 make sudo cp src/redis-server /usr/local/bin sudo cp src/redis-cli /usr/local/bin
sudo mkdir -p /etc/redis sudo mkdir -p /var/redis cd /home/$USER/redis-2.6.16/ sudo cp utils/redis_init_script /etc/init.d/redis wget https://bitbucket.org/ptylr/public-stuff/raw/41d5c8e87ce6adb34aa16cd571c3f04fb4d5e7ac/etc/init.d/redis sudo cp redis /etc/init.d/redis cd /home/$USER/redis-2.6.16/ sudo cp redis.conf /etc/redis/redis.conf
chkconfig
, set it to autostart, and actually start the service:chkconfig --add redis chkconfig redis on service redis start
redis-cli ping
If the result of the test command is PONG
, then the installation has been successful.
Now, we will assume that you have the Redis service up and running.
Next, we will create a sample Storm topology that will explain how you can store the data processed by Storm in Redis.
com.learningstorm
for the Group ID and storm-redis
for the Artifact ID.pom.xml
file:<repositories> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.4.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.1.1</version> </dependency> </dependencies>
RedisOperations
class in the com.learningstorm.storm_redis
package. The RedisOperations
class contains the following method:insert(Map<String, Object> record, String id)
: This method takes the record and ID as input and inserts the input record in Redis. In the insert()
method, we will first serialize the record into a string using the Jackson library and then store the serialized record into Redis. Each record must have a unique ID because it is used to retrieve the record from Redis.The following is the source code of the RedisOperations
class:
public class RedisOperations implements Serializable { private static final long serialVersionUID = 1L; Jedis jedis = null; public RedisOperations(String redisIP, int port) { // Connecting to Redis jedis = new Jedis(redisIP, port); } /* This method takes the record and record id as input. We will first serialize the record into String using Jackson library and then store the whole record into Redis.User can use the record id to retrieve the record from Redis*/ public void insert(Map<String, Object> record, String id) { try { jedis.set(id, new ObjectMapper().writeValueAsString(record)); } catch (Exception e) { System.out.println("Record not persisted into datastore"); } } }
SampleSpout
class created in the IntegratingStorm with HBase section.StormRedisBolt
class in the com.learningstorm.storm_redis
package. This bolt receives the tuples emitted by the SampleSpout
class, converts it to the Redis structure, and then calls the insert()
method of the RedisOperations
class to insert the record into Redis. The following is the source code of the StormRedisBolt
class:public class StormRedisBolt
implements IBasicBolt{
private static final long serialVersionUID = 2L;
private RedisOperations redisOperations = null;
private String redisIP = null;
private int port;
public StormRedisBolt(String redisIP, int port) {
this.redisIP = redisIP;
this.port = port;
}
public void execute(Tuple input, BasicOutputCollector collector) {
Map<String, Object> record = new HashMap<String, Object>();
//"firstName","lastName","companyName")
record.put("firstName", input.getValueByField("firstName"));
record.put("lastName", input.getValueByField("lastName"));
record.put("companyName", input.getValueByField("companyName"));
redisOperations.insert(record, UUID.randomUUID().toString());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
public void prepare(Map stormConf, TopologyContext context) {
redisOperations = new RedisOperations(this.redisIP, this.port);
}
public void cleanup() {
}
}
In the StormRedisBolt
class, we are using the java.util.UUID
class to generate the Redis key.
Topology
class in the com.learningstorm.storm_redis
package. This class creates an instance of the spout and bolt classes and chains them together using a TopologyBuilder
class. The following is the implementation of the main class:public class Topology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); List<String> zks = new ArrayList<String>(); zks.add("127.0.0.1"); List<String> cFs = new ArrayList<String>(); cFs.add("personal"); cFs.add("company"); // set the spout class builder.setSpout("spout", new SampleSpout(), 2); // set the bolt class builder.setBolt("bolt", new StormRedisBolt("127.0.0.1",6379), 2).shuffleGrouping("spout"); Config conf = new Config(); conf.setDebug(true); // create an instance of LocalCluster class for // executing topology in local mode. LocalCluster cluster = new LocalCluster(); // StormRedisTopology is the name of submitted topology. cluster.submitTopology("StormRedisTopology", conf, builder.createTopology()); try { Thread.sleep(10000); } catch (Exception exception) { System.out.println("Thread interrupted exception : " + exception); } // kill the StormRedisTopology cluster.killTopology("StormRedisTopology"); // shutdown the storm test cluster cluster.shutdown(); } }
In this section, we covered installation of Redis and how we can integrate Storm with Redis.
3.15.219.130