Integrating Storm with Redis

Redis is a key value data store. The key values can be strings, lists, sets, hashes, and so on. It is extremely fast because the entire dataset is stored in the memory. The following are the steps to install Redis:

  1. First, you will need to install make, gcc, and cc to compile the Redis code using the following command:
    sudo yum -y install make gcc cc
    
  2. Download, unpack, and make Redis, and copy it to /usr/local/bin using the following commands:
    cd /home/$USER
    

    Here, $USER is the name of the Linux user.

    http://download.redis.io/releases/redis-2.6.16.tar.gz
    tar -xvf redis-2.6.16.tar.gz
    cd redis-2.6.16
    make
    sudo cp src/redis-server /usr/local/bin
    sudo cp src/redis-cli /usr/local/bin
    
  3. Execute the following commands to make Redis as a service:
    sudo mkdir -p /etc/redis
    sudo mkdir -p /var/redis
    cd /home/$USER/redis-2.6.16/
    sudo cp utils/redis_init_script /etc/init.d/redis
    wget https://bitbucket.org/ptylr/public-stuff/raw/41d5c8e87ce6adb34aa16cd571c3f04fb4d5e7ac/etc/init.d/redis
    sudo cp redis /etc/init.d/redis
    cd /home/$USER/redis-2.6.16/
    sudo cp redis.conf /etc/redis/redis.conf
    
  4. Now, run the following commands to add the service to chkconfig, set it to autostart, and actually start the service:
    chkconfig --add redis
    chkconfig redis on
    service redis start
    
  5. Check the installation of Redis with the following command:
    redis-cli ping
    

    If the result of the test command is PONG, then the installation has been successful.

    Now, we will assume that you have the Redis service up and running.

    Next, we will create a sample Storm topology that will explain how you can store the data processed by Storm in Redis.

  6. Create a Maven project using com.learningstorm for the Group ID and storm-redis for the Artifact ID.
  7. Add the following dependencies and repositories in the pom.xml file:
    <repositories>
      <repository>
        <id>clojars.org</id>
        <url>http://clojars.org/repo</url>
      </repository>
    </repositories>
    <dependencies>
      <dependency>
        <groupId>storm</groupId>
        <artifactId>storm</artifactId>
        <version>0.9.0.1</version>
      </dependency>
      <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
      </dependency>
      <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.4.2</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.1.1</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.1.1</version>
      </dependency>
    </dependencies>
  8. Create a RedisOperations class in the com.learningstorm.storm_redis package. The RedisOperations class contains the following method:
    • insert(Map<String, Object> record, String id): This method takes the record and ID as input and inserts the input record in Redis. In the insert() method, we will first serialize the record into a string using the Jackson library and then store the serialized record into Redis. Each record must have a unique ID because it is used to retrieve the record from Redis.

      The following is the source code of the RedisOperations class:

      public class RedisOperations implements Serializable {
      
        private static final long serialVersionUID = 1L;
        Jedis jedis = null;
      
        public RedisOperations(String redisIP, int port) {
          // Connecting to Redis
          jedis = new Jedis(redisIP, port);
        }
        /* This method takes the record and record id as input. We will first serialize the record into String using Jackson library and then store the whole record into Redis.User can use the record id to retrieve the record from Redis*/
        public void insert(Map<String, Object> record, String id) {
          try {
            jedis.set(id, new ObjectMapper().writeValueAsString(record));
          } catch (Exception e) {
      
            System.out.println("Record not persisted into datastore");
          }
      
        }
      }
  9. We will use the same SampleSpout class created in the IntegratingStorm with HBase section.
  10. Create a StormRedisBolt class in the com.learningstorm.storm_redis package. This bolt receives the tuples emitted by the SampleSpout class, converts it to the Redis structure, and then calls the insert() method of the RedisOperations class to insert the record into Redis. The following is the source code of the StormRedisBolt class:
    public class StormRedisBolt implements IBasicBolt{
    
      private static final long serialVersionUID = 2L;
      private RedisOperations redisOperations = null;
      private String redisIP = null;
      private int port;
      public StormRedisBolt(String redisIP, int port) {
        this.redisIP = redisIP;
        this.port = port;
      }
    
      public void execute(Tuple input, BasicOutputCollector collector) {
        Map<String, Object> record = new HashMap<String, Object>();
        //"firstName","lastName","companyName")
        record.put("firstName", input.getValueByField("firstName"));
        record.put("lastName", input.getValueByField("lastName"));
        record.put("companyName", input.getValueByField("companyName"));
        redisOperations.insert(record, UUID.randomUUID().toString());
      }
    
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
      }
    
      public Map<String, Object> getComponentConfiguration() {
        return null;
      }
    
      public void prepare(Map stormConf, TopologyContext context) {
        redisOperations = new RedisOperations(this.redisIP, this.port);
      }
    
      public void cleanup() {
    
      }
    
    }

    In the StormRedisBolt class, we are using the java.util.UUID class to generate the Redis key.

  11. Create a Topology class in the com.learningstorm.storm_redis package. This class creates an instance of the spout and bolt classes and chains them together using a TopologyBuilder class. The following is the implementation of the main class:
    public class Topology {
      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        TopologyBuilder builder = new TopologyBuilder();
    List<String> zks = new ArrayList<String>();
     zks.add("127.0.0.1");
      
    List<String> cFs = new ArrayList<String>();
    cFs.add("personal");
    cFs.add("company");
    
        // set the spout class
        builder.setSpout("spout", new SampleSpout(), 2);
        // set the bolt class
        builder.setBolt("bolt", new StormRedisBolt("127.0.0.1",6379), 2).shuffleGrouping("spout");
    
        Config conf = new Config();
        conf.setDebug(true);
        // create an instance of LocalCluster class for
        // executing topology in local mode.
        LocalCluster cluster = new LocalCluster();
    
        // StormRedisTopology  is the name of submitted topology.
        cluster.submitTopology("StormRedisTopology", conf, builder.createTopology());
        try {
          Thread.sleep(10000);
        } catch (Exception exception) {
          System.out.println("Thread interrupted exception : " + exception);
        }
        // kill the StormRedisTopology
        cluster.killTopology("StormRedisTopology");
        // shutdown the storm test cluster
        cluster.shutdown();
    
      }
    }

In this section, we covered installation of Redis and how we can integrate Storm with Redis.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.173.199