Joining data in the Mapper using MapReduce

Joining data in MapReduce is an expensive operation. Depending on the size of the datasets, you can choose to perform a map-side join or a reduce-side join. In a map-side join, two or more datasets are joined on a key in the map phase of a MapReduce job. In a reduce-side join, the mapper emits the join key, and the reduce phase is responsible for joining the two datasets. In this recipe we will demonstrate how to perform a map-side replicated join using Pig. We will join a weblog dataset, and a dataset containing a list of distinct IPs and their associated country. As the datasets will be joined in the map-phase, this will be a map-only job.

Getting ready

Download the apache_nobots_tsv.txt and nobots_ip_country_tsv.txt datasets from http://www.packtpub.com/support and place them into HDFS.

How to do it...

Carry out the following steps to join data in the map phase using MapReduce:

  1. Set up a map-only MapReduce job that will load the nobots_ip_country_tsv.txt dataset into the distributed cache:
    public class MapSideJoin extends Configured implements Tool {
        
      public int run(String[] args) throws Exception {
      
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
        
        Configuration conf = getConf();
        DistributedCache.addCacheFile(new 
          URI("/user/hadoop/nobots_ip_country_tsv.txt"), conf);
        Job weblogJob = new Job(conf);
        weblogJob.setJobName("MapSideJoin");
        weblogJob.setNumReduceTasks(0);
        weblogJob.setJarByClass(getClass());
        weblogJob.setMapperClass(WeblogMapper.class);        
        weblogJob.setMapOutputKeyClass(Text.class);
        weblogJob.setMapOutputValueClass(Text.class);
        weblogJob.setOutputKeyClass(Text.class);
        weblogJob.setOutputValueClass(Text.class);
        weblogJob.setInputFormatClass(TextInputFormat.class);
        weblogJob.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputPaths(weblogJob, inputPath);
        FileOutputFormat.setOutputPath(weblogJob, outputPath);
        
        
        if(weblogJob.waitForCompletion(true)) {
          return 0;
        }
        return 1;
      }
      
      public static void main( String[] args ) throws Exception {
        int returnCode = ToolRunner.run(new MapSideJoin(), args);
         System.exit(returnCode);
      }
    }
  2. Create a mapper to read the nobots_ip_country_tsv.txt dataset from the distributed cache, and store the IP/Country table into a HashMap.
    public class WeblogMapper extends Mapper<Object, Text, Text, Text> {
    
      public static final String IP_COUNTRY_TABLE_FILENAME = 
        "nobots_ip_country_tsv.txt";
      private Map<String, String> ipCountryMap = new 
        HashMap<String, String>();
      
      private Text outputKey = new Text();
      private Text outputValue = new Text();
      
      @Override
      protected void setup(Context context) throws IOException,     InterruptedException {
          Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());
          for (Path p : files) {
            if (p.getName().equals(IP_COUNTRY_TABLE_FILENAME)) {
              BufferedReader reader = new BufferedReader(new FileReader(p.toString()));
              String line = reader.readLine();
              while(line != null) {
                String[] tokens = line.split("	");
                String ip = tokens[0];
                String country = tokens[1];
                ipCountryMap.put(ip, country);
                line = reader.readLine();
              }
            }
          }
          
          if (ipCountryMap.isEmpty()) {
            throw new IOException("Unable to load IP country table.");
          }
        }
    
        @Override
        protected void map(Object key, Text value, Context      context) throws IOException, InterruptedException {
            String row = value.toString();
            String[] tokens = row.split("	");
            String ip = tokens[0];
            String country = ipCountryMap.get(ip);
            outputKey.set(country);
            outputValue.set(row);
            context.write(outputKey, outputValue);
        }
        
    }
  3. Run the job:
    $ hadoop jar AdvJoinChapter5-1.0.jar com.packt.ch5.advjoin.mr.MapSideJoin /user/hadoop/apache_nobots_tsv.txt /user/hadoop/data_jnd

How it works...

In step 1, we called the following static method:

DistributedCache.addCacheFile(new URI("/user/hadoop/nobots_ip_country_tsv.txt"), conf)

This method will set the mapred.cache.files property in the job configuration. The mapred.cache.files property tells the MapReduce framework to distribute the nobots_ip_country_tsv.txt file to every node in the cluster that will launch a mapper (and reducer if your job is configured to run reducers).

In step 2, we overrode the setup() method of the mapper. The setup() method is called by the MapReduce framework only once, prior to any calls to the map() method. The setup() method is an excellent place to perform any one-time initialization to the mapper class.

To read from the distributed cache, we used the static method DistributedCache.getLocalCacheFiles(context.getConfiguration()) to get all of the files that have been placed, into the distributed cache. Next, we iterated over every file in the distributed cache, which was only one, and loaded the nobots_ip_country_tsv.txt dataset into a HashSet.

Finally, in the map() method, we used the HashSet loaded in the setup() method to join the nobots_ip_country_tsv.txt and the apache_nobots_tsv.txt files by emitting the country associated with every IP in the apache_nobots_tsv.txt file.

There's more...

The MapReduce framework also supports distributing archive files using the distributed cache. An archive file can be a ZIP file, GZIP file, or even a JAR file. Once the archives have been distributed to the task nodes, they will be decompressed automatically.

To add an archive to the distributed cache, simply use the addCacheArchive() static method of the DistributedCache class when configuring the MapReduce job:

        DistributedCache.addCacheArchive(new URI("/user/hadoop/nobots_ip_country_tsv.zip"), conf);

See also

  • Joining data using Apache Pig replicated join
  • Joining sorted data using Apache Pig merge join
  • Joining skewed data using Apache Pig skewed join
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.255.187