Using MultipleOutputs in MapReduce to name output files

A common request among MapReduce users is to control output file names to something other than part-*. This recipe shows how you can use the MultipleOutputs class to emit different key-value pairs to the same named file that you chose.

Getting ready

You will need to download the ip-to-country.txt dataset from the Packt website, http://www.packtpub.com/support, and place the file in HDFS.

How to do it...

Follow these steps to use MultipleOutputs:

  1. Create a class named NamedCountryOutputJob and configure the MapReduce job:
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    import java.util.regex.Pattern;
    
    
    public class NamedCountryOutputJob implements Tool{
    
        private Configuration conf;
        public static final String NAME = "named_output";
    
    
        public static void main(String[] args) throws Exception {
            ToolRunner.run(new Configuration(), new NamedCountryOutputJob(), args);
        }
        public int run(String[] args) throws Exception {
            if(args.length != 2) {
                System.err.println("Usage: named_output <input> <output>");
                System.exit(1);
            }
    
            Job job = new Job(conf, "IP count by country to named files");
            job.setInputFormatClass(TextInputFormat.class);
    
            job.setMapperClass(IPCountryMapper.class);
            job.setReducerClass(IPCountryReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setJarByClass(NamedCountryOutputJob.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            return job.waitForCompletion(true) ? 1 : 0;
    
        }
    
        public void setConf(Configuration conf) {
            this.conf = conf;
        }
    
        public Configuration getConf() {
            return conf;
        }
  2. Create a mapper to emit the key-value pair country, and the number 1:
    public static class IPCountryMapper
                extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            private static final int country_pos = 1;
            private static final Pattern pattern = Pattern.compile("\t");
    
            @Override
            protected void map(LongWritable key, Text value,
                               Context context) throws IOException, InterruptedException {
                String country = pattern.split(value.toString())[country_pos];
                context.write(new Text(country), new IntWritable(1));
            }
        }
  3. Create a reducer that sums all of the country counts, and writes the output to separate files using MultipleOutputs:
    public static class IPCountryReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
    
            private MultipleOutputs output;
    
            @Override
            protected void setup(Context context
            ) throws IOException, InterruptedException {
                output = new MultipleOutputs(context);
            }
    
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int total = 0;
                for(IntWritable value: values) {
                    total += value.get();
                }
               output.write(new Text("Output by MultipleOutputs"), 
                             NullWritable.get(), key.toString());
                output.write(key, new IntWritable(total), key.toString());
            }
    
            @Override
            protected void cleanup(Context context
            ) throws IOException, InterruptedException {
                output.close();
            }
        }

Once the job completes successfully, you should see named output files under the provided output directory (for example, Qatar-r-#####, Turkey-r-#####).

How it works...

We first set up our job using the Tool interface provided by Hadoop. The run() method inside NamedCountryOutputJob checks that both input and output HDFS path directories are provided. In addition, both the mapper and reducer classes are set, and we configure the InputFormat to read lines of text.

The mapper class defines a statically initialized position to read the country from each line, as well as the regex pattern to split each line. The mapper will output the country as the key and 1 for every line it appears on.

At the reduce phase, each task JVM runs the setup() routine and initializes a MultipleOutputs instance named output.

Each call to reduce() presents a country and a tally of every occurrence of the country appearing in the dataset. We sum the tally into a final count. Before we emit the final count, we will use the output instance to write a header to the file. The key contains the text for the header Output by MultipleOutputs, and we null out the value since we don't need it. We specify key.toString() to write the header to a custom file named by the current country. On the next line we call output.write() again, except this time with the input key as the output key, the final count as the output value, and the key.toString() method to specify the same output file as the previous output.write() method.

The end result is a named country file containing both the header and the final tallied count for that country.

By using MultipleOutputs, we don't have to configure an OutputFormat class in our job setup routine. Also, we are not limited to just one concrete type for the reducer output key and value. We were able to output key-value pairs for both Text/NullWritable and Text/IntWritable to the exact same file.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.224.226