Joining data using an external key-value store (Redis)

Key-value stores are an efficient tool for storing large datasets. In MapReduce, we can use key-value stores to house large datasets that might not fit into the memory of a mapper or mappers (remember that multiple mappers can run on the same slave node), but can fit into the main memory of the server.

In this recipe, we will demonstrate how to use Redis to perform a map-side join using MapReduce.

Getting ready

First, download and install Redis. This book used Redis version 2.4.15. A quick start guide is available on the Redis website, http://redis.io/topics/quickstart. Once you have compiled and installed the Redis server, start the server by issuing the following command:

$ redis-server

Verify that the Redis server is working properly by using redis-cli:

$ redis-cli ping

Redis should respond with the message "PONG" if everything has been set up properly.

Next, you will need to download and compile Jedis from https://github.com/xetorthio/jedis. Jedis is a Redis Java client that we will use in our MapReduce application to communicate with Redis. This book used Jedis version 2.1.0.

Finally, download the apache_nobots_tsv.txt and nobots_ip_country_tsv.txt datasets from http://www.packtpub.com/support. Place the apache_nobots_tsv.txt file into HDFS, and leave the nobots_ip_country_tsv.txt file in the folder that you are working on.

How to do it...

Follow these steps to join data in MapReduce using Redis:

  1. Create a Java method to read the nobots_ip_country_tsv.txt file fromthe folder that you are working on, and load its contents to Redis using the Jedis client:
    private void loadRedis(String ipCountryTable) throws IOException {        FileReader freader = new FileReader(ipCountryTable);
            BufferedReader breader = new BufferedReader(freader);
            jedis = new Jedis("localhost");
            jedis.select(0);
            jedis.flushDB();
            String line = breader.readLine();
            while(line != null) {
                String[] tokens = line.split("	");
                String ip = tokens[0];
                String country = tokens[1];
                jedis.set(ip, country);
                line = breader.readLine();
            }
            System.err.println("db size = " + jedis.dbSize());
        }
  2. Next, set up a map-only MapReduce job. The following code snippet is the final version of the class to create a map-only MapReduce job. It contains the loadRedis()method we created in step 1:
    public class MapSideJoinRedis extends Configured implements Tool {
        
        private Jedis jedis = null;
        
        private void loadRedis(String ipCountryTable) throws 
          IOException {
            
          FileReader freader = new FileReader(ipCountryTable);
          BufferedReader breader = new BufferedReader(freader);
          jedis = new Jedis("localhost");
          jedis.select(0);
          jedis.flushDB();
          String line = breader.readLine();
          while(line != null) {
            String[] tokens = line.split("	");
            String ip = tokens[0];
            String country = tokens[1];
            jedis.set(ip, country);
            line = breader.readLine();
          }
          System.err.println("db size = " + jedis.dbSize());
        }
        
        public int run(String[] args) throws Exception {
            
          Path inputPath = new Path(args[0]);
          String ipCountryTable = args[1];
          Path outputPath = new Path(args[2]);
          
          loadRedis(ipCountryTable);
          
          Configuration conf = getConf();
          Job weblogJob = new Job(conf);
          weblogJob.setJobName("MapSideJoinRedis");
          weblogJob.setNumReduceTasks(0);
          weblogJob.setJarByClass(getClass());
          weblogJob.setMapperClass(WeblogMapper.class);        
          weblogJob.setMapOutputKeyClass(Text.class);
          weblogJob.setMapOutputValueClass(Text.class);
          weblogJob.setOutputKeyClass(Text.class);
          weblogJob.setOutputValueClass(Text.class);
          weblogJob.setInputFormatClass(TextInputFormat.class);
          weblogJob.setOutputFormatClass(TextOutputFormat.class);
          FileInputFormat.setInputPaths(weblogJob, inputPath);
          FileOutputFormat.setOutputPath(weblogJob, outputPath);
          
          
          if(weblogJob.waitForCompletion(true)) {
            return 0;
          }
          return 1;
        }
        
        public static void main(String[] args) throws Exception {
            int returnCode = ToolRunner.run(new 
                               MapSideJoinRedis(), args);
            System.exit(returnCode);
        }
    }
  3. Create a mapper that will join the apache_nobots_tsv.txt dataset with the nobots_ip_country_tsv.txt dataset that has been loaded to Redis:
    public class WeblogMapper extends Mapper<Object, Text, Text, Text> {
    
        private Map<String, String> ipCountryMap = new 
          HashMap<String, String>();
        private Jedis jedis = null;
        private Text outputKey = new Text();
        private Text outputValue = new Text();
        
        private String getCountry(String ip) {
            String country = ipCountryMap.get(ip);
            if (country == null) {
                if (jedis == null) {
                    jedis = new Jedis("localhost");
                    jedis.select(0);
                }
                country = jedis.get(ip);
                ipCountryMap.put(ip, country);
            }
            return country;
        }
    
        @Override
        protected void map(Object key, Text value, Context 
          context) throws IOException, InterruptedException {
            String row = value.toString();
            String[] tokens = row.split("	");
            String ip = tokens[0];
            String country = getCountry(ip);
            outputKey.set(country);
            outputValue.set(row);
            context.write(outputKey, outputValue);
        }
        
    }
  4. Finally, launch the MapReduce job:
    $ hadoop jar AdvJoinChapter5-1.0-SNAPSHOT.jar com.packt.ch5.advjoin.redis.MapSideJoinRedis /user/hadoop/apache_nobots_tsv.txt ./nobots_ip_country_tsv.txt /user/hadoop/data_jnd

How it works...

In steps 1 and 2, we created a class to set up a map-only job. This class looks very familiar to other map-only jobs we've created in past recipes, except for the loadRedis() method.

The loadRedis() method first connects to the local Redis instance using the Jedis constructor. Next, we used the select() method to choose which Redis database we wanted to use. A single Redis instance can contain a number of databases, which are identified using a numeric index. Once we get connected to the desired database, we call the method flushDB(), which deletes everything currently stored in the current database. Finally, we read the nobots_ip_country_tsv.txt file from the folder in which you are currently working, and load the Redis instance with the key-value pair ip/country by using the set() method.

There's more...

This recipe used a very simple string data structure to store the ip/country key-value pairs. Redis supports many other data structures, including hashes, lists, and sorted sets. In addition, Redis has support for transactions, and a publish/subscribe mechanism. Visit the Redis website http://redis.io/, to review all of this functionality in depth.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.9.148