This recipe will use MapReduce to load tab-separated ACLED event data directly into an Accumulo table.
This recipe will be the easiest to test over a pseudo-distributed Hadoop cluster with Accumulo 1.4.1 and Zookeeper 3.3.3 installed. The shell script in this recipe assumes that Zookeeper is running on the host localhost
on port 2181
; you can change this to suit your environment needs. The Accumulo installation's bin
folder needs to be on your environment path.
For this recipe, you'll need to create an Accumulo instance named test
with the user as root
and password as password
(top-notch security, I know…).
You will need the dataset ACLED_nigeria_cleaned.tsv
loaded into HDFS at the path /input/acled_cleaned/
.
It is also highly recommended that you complete the Designing a row key to store geographic events in Accumulo recipe earlier in this chapter. This recipe will use the classes AccumuloTableAssistant.java
and ACLEDRowIDGenerator.java
, and its parent interface RowIDGenerator.java
to help with the setup.
Follow these steps to bulk load events into Accumulo using MapReduce:
accumulo-examples.jar
.example.accumulo
and add RowIDGenerator.java
, AccumuloTableAssistant.java
, and ACLEDRowIDGenerator.java
.ACLEDIngest.java
with the following content:package examples.accumulo; import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.regex.Pattern; public class ACLEDIngest extends Configured implements Tool { private Configuration conf; public ACLEDIngest(Configuration conf) { this.conf = conf; }
run()
method is where we create and submit the job.@Override public int run(String[] args) throws Exception { if(args.length < 8) { System.err.println(printUsage()); System.exit(0); } Job job = new Job(conf, "ACLED ingest to Accumulo"); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(ACLEDIngestMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ACLEDIngestReducer.class); job.setPartitionerClass(RangePartitioner.class); job.setJarByClass(getClass()); String input = args[0]; String outputStr = args[1]; String instanceName = args[2]; String tableName = args[3]; String user = args[4]; String pass = args[5]; String zooQuorum = args[6]; String localSplitFile = args[7]; FileInputFormat.addInputPath(job, new Path(input)); AccumuloFileOutputFormat.setOutputPath(job, clearOutputDir(outputStr)); job.setOutputFormatClass( AccumuloFileOutputFormat.class);
AccumuloTableAssistant
instance to help create and presplit the acled
table.AccumuloTableAssistant tableAssistant = new AccumuloTableAssistant.Builder() .setInstanceName(instanceName) .setTableName(tableName).setUser(user) .setPassword(pass) .setZooQuorum(zooQuorum) .build(); String splitFileInHDFS = "/tmp/splits.txt"; int numSplits = 0; tableAssistant.createTableIfNotExists(); if(localSplitFile != null) { numSplits = tableAssistant.presplitAndWriteHDFSFile(conf, localSplitFile, splitFileInHDFS); } RangePartitioner.setSplitFile(job, splitFileInHDFS); job.setNumReduceTasks(numSplits + 1); if(job.waitForCompletion(true)) { tableAssistant.loadImportDirectory(conf, outputStr); } return 0; }
printUsage()
and clearOutputDir()
to show argument order and to automatically clear the supplied output directory.private String printUsage() { return "<input> <output> <instance_name> <tablename> + "<username> <password> <zoohosts> <splits_file_path>"; } private Path clearOutputDir(String outputStr) throws IOException { FileSystem fs = FileSystem.get(conf); Path path = new Path(outputStr); fs.delete(path, true); return path; }
ACLEDIngestMapper.java
.public static class ACLEDIngestMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outKey = new Text(); private static final Pattern tabPattern = Pattern.compile("[\t]"); private ACLEDRowIDGenerator gen = new ACLEDRowIDGenerator(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = tabPattern.split(value.toString()); if(values.length == 8) { String [] rowKeyFields = new String[] // lat,lon,timestamp {values[4], values[5], values[1]}; outKey.set(gen.getRowID(rowKeyFields)); context.write(outKey, value); } else { context.getCounter("ACLED Ingest", "malformed records").increment(1l); } } }
ACLEDIngestReducer.java
.public static class ACLEDIngestReducer extends Reducer<Text, Text, Key, Value> { private Key outKey; private Value outValue = new Value(); private Text cf = new Text("cf"); private Text qual = new Text(); private static final Pattern tabPattern = Pattern.compile("[\t]"); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int found = 0; for(Text value : values) { String[] cells = tabPattern.split(value.toString()); if(cells.length == 8) { // don't write duplicates if(found < 1) { write(context, key, cells[3],"atr"); write(context, key, cells[1], "dtg"); write(context, key, cells[7], "fat"); write(context, key, cells[4], "lat"); write(context, key, cells[0], "loc"); write(context, key, cells[5], "lon"); write(context, key, cells[6], "src"); write(context, key, cells[2],"type"); } else { context.getCounter("ACLED Ingest", "duplicates").increment(1l); } } else { context.getCounter("ACLED Ingest", "malformed records missing a field").increment(1l); } found++; } }
private void write(Context context, Text key, String cell, String qualStr) throws IOException, InterruptedException { if(!cell.toUpperCase().equals("NULL")) { qual.set(qualStr); outKey = new Key(key, cf, qual, System.currentTimeMillis()); outValue.set(cell.getBytes()); context.write(outKey, outValue); } } } @Override public void setConf(Configuration conf) { this.conf = conf; } @Override public Configuration getConf() { return conf; }
ToolRunner
class.public static void main(String[] args) throws Exception { Configuration conf = CachedConfiguration.getInstance(); args = new GenericOptionsParser(conf, args).getRemainingArgs(); ToolRunner.run(new ACLEDIngest(conf), args); } }
accumulo-examples.jar
to the base working directory.splits.txt
in the base working folder with the following strings: 00
, 01
, 10
, 11
; each on their own line in the file.bulk_ingest.sh
in the base working folder that has the following contents:tool.sh accumulo_examples.jar examples.accumulo.ACLEDIngest /input/acled_cleaned/ /output/accumulo_acled_load/ test acled root password localhost:2181 splits.txt
acled
in Accumulo.The program takes in eight arguments, each of which is very important. The input location is where MapReduce will find the ACLED data. The output folder is where it will output data in Accumulo's native RFile format. The string test
is our testing Accumulo instance name stored in Zookeeper. The string acled
is our desired table name in Accumulo. We authenticate with the Accumulo instance using the strings root:password
. For this execution, we supplied one Zookeeper host on localhost:2181
. Finally, splits.txt
is used to help
presplit our newly created acled
table.
The program clears any previous folder located in our output location. We configure the AccumuloFileOutputFormat
to write to this location. For this job, the mapper will output the type Text
for both the key and the value.
AccumuloTableAssistant
utilizes the
Builder
pattern to chain setter calls for object instantiation and helps avoid accidentally misplacing the arguments during construction time. We'll create the table acled
if it does not exist, and will use the assistant to presplit the table based on our locally supplied splits.txt
file. Without presplitting the table at creation time, the RangePartitioner
class would force all of the intermediate key-value pairs to a single reducer. It is much more efficient to create presplit tablets based on expected row-key distribution and to allow multiple reducers to build RFiles in parallel. We set the number of reducers to the number of entries in our splits.txt
file plus 1 to handle keys that fall above our highest split point (11
). Finally, we are ready to submit the job and to examine the map and reduce phases.
Each map task JVM creates an internal instance of ACLEDRowIDGenerator
. See the Designing a row key to store geographic events in Accumulo recipe in this chapter for an in-depth explanation of how this class works. Our data is tab-delimited and follows a very strict column ordering, thus we can hand-pick the column indices to read the values for lat
, lon
, and dtg
in that respective order. The key generator needs these fields to make a valid composite geospatial and reverse timestamp rowID. We output the generated row key with the text value that was read for the line. This produces a distinct intermediate key for every unique rowID we wish to insert into Accumulo.
The reducer is responsible for taking our generated rowID and reading through any other delimited lines that output an equivalent rowID. The rowID generator in the map phase builds unique rowIDs based on the composite of lat
, lon
, and dtg
. By definition, an ACLED event that took place in the exact same lat
/lon
with the same reverse timestamp, would be grouped to the same intermediate key for the reducer. However, having multiple ACLED events with the exact same rowID means that we have duplicate entries we wish to ignore. Therefore, we only want to preserve the first value collected in the Iterable
object. This job does not do any duplicate merging. We use a counter to keep track of duplicate occurrences, as well as invalid lines
that don't split properly. Since we are directly writing instances of Key
/Value
as RFiles, Accumulo requires key
/value
objects to be inserted in a sorted order. The rowIDs are naturally the same for each qualifier, and the column family is a static label cf
, but it's very important that we maintain lexicographical ordering while considering the write order for our qualifier labels. Fortunately, our data is very predictable, and we hardcode the column value reads based on the alphabetic ordering of our qualifier labels.
Once the job is finished, and we have all of the RFiles for the presplit tablets, we use the assistant instance to read all of the files produced to the output directly and place them in the appropriate tablet. The data is immediately available for querying in the acled
table in Accumulo.
Here is a bit more explanation on some of the design choices you see in this recipe:
This class is designed for re-use across different Accumulo data loading and management applications. Since it requires five input strings for operation, the
Builder
pattern was an obvious choice to prevent accidental variable assignment constructions. See Effective Java 2.0 by Joshua Block for more detail on the Builder
design pattern.
The choice of 00
, 01
, 10
, and 11
as split points was entirely arbitrary. It was more to emphasize the importance of presplitting Accumulo tables during creation. Choosing the right split points really depends on the distribution of your rowID ranges. Too few split points and job throughput will bottleneck at the reduce stage. Too many, and you may start to waste resources and spin up time on underutilized reduce task JVMs.
If you need to ingest data at a massive volume,
AccumuloFileOutputFormat
is the obvious choice. Producing RFiles for direct insert into tablets is not subject to the AccumuloOutputFormat
overhead of writing mutations directly to the Accumulo table. On the other hand, if your MapReduce job is not write-intensive, it can be easier to work directly with Mutation
instances instead of RFiles. Moreover, if your job does not require reduction and is map-only, AccumuloOutputFormat
and writing direct mutations would be a much simpler design choice.
3.142.133.180