A common request among MapReduce users is to control
output file names to something other than part-*
. This recipe shows how you can use the MultipleOutputs
class to emit different key-value pairs to the same named file that you chose.
You will need to download the ip-to-country.txt
dataset from the Packt website, http://www.packtpub.com/support, and place the file in HDFS.
Follow these steps to use MultipleOutputs
:
NamedCountryOutputJob
and configure the MapReduce job:import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.regex.Pattern; public class NamedCountryOutputJob implements Tool{ private Configuration conf; public static final String NAME = "named_output"; public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new NamedCountryOutputJob(), args); } public int run(String[] args) throws Exception { if(args.length != 2) { System.err.println("Usage: named_output <input> <output>"); System.exit(1); } Job job = new Job(conf, "IP count by country to named files"); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(IPCountryMapper.class); job.setReducerClass(IPCountryReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setJarByClass(NamedCountryOutputJob.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 1 : 0; } public void setConf(Configuration conf) { this.conf = conf; } public Configuration getConf() { return conf; }
country
, and the number 1
:public static class IPCountryMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int country_pos = 1; private static final Pattern pattern = Pattern.compile("\t"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String country = pattern.split(value.toString())[country_pos]; context.write(new Text(country), new IntWritable(1)); } }
country
counts, and writes the output to separate files using MultipleOutputs
:public static class IPCountryReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private MultipleOutputs output; @Override protected void setup(Context context ) throws IOException, InterruptedException { output = new MultipleOutputs(context); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int total = 0; for(IntWritable value: values) { total += value.get(); } output.write(new Text("Output by MultipleOutputs"), NullWritable.get(), key.toString()); output.write(key, new IntWritable(total), key.toString()); } @Override protected void cleanup(Context context ) throws IOException, InterruptedException { output.close(); } }
Once the job completes successfully, you should see named output files under the provided output directory (for example, Qatar-r-#####
, Turkey-r-#####
).
We first set up our job using the Tool
interface provided by
Hadoop. The
run()
method inside NamedCountryOutputJob
checks that both input and output HDFS path directories are provided. In addition, both the mapper and reducer classes are set, and we configure the InputFormat to read lines of text.
The mapper class defines a statically initialized position to read the country from each line, as well as the regex pattern to split each line. The mapper will output the country as the key and 1
for every line it appears on.
At the reduce phase, each task JVM runs the setup()
routine and initializes a MultipleOutputs
instance named output
.
Each call to reduce()
presents a country and a tally of every occurrence of the country appearing in the dataset. We sum the tally into a final count. Before we emit the final count, we will use the output instance to write a header to the file. The key contains the text for the header Output by MultipleOutputs
, and we null out the value since we don't need it. We specify key.toString()
to write the header to a custom file named by the current country. On the next line we call output.write()
again, except this time with the input key as the output key, the final count as the output value, and the key.toString()
method to specify the same output file as the previous output.write()
method.
The end result is a named country file containing both the header and the final tallied count for that country.
By using MultipleOutputs
, we don't have to configure an OutputFormat
class in our job setup routine. Also, we are not limited to just one concrete type for the reducer output key and value. We were able to output key-value pairs for both Text/NullWritable
and Text/IntWritable
to the exact same file.
18.218.224.226