Using MapReduce to bulk import geographic event data into Accumulo

This recipe will use MapReduce to load tab-separated ACLED event data directly into an Accumulo table.

Getting ready

This recipe will be the easiest to test over a pseudo-distributed Hadoop cluster with Accumulo 1.4.1 and Zookeeper 3.3.3 installed. The shell script in this recipe assumes that Zookeeper is running on the host localhost on port 2181; you can change this to suit your environment needs. The Accumulo installation's bin folder needs to be on your environment path.

For this recipe, you'll need to create an Accumulo instance named test with the user as root and password as password (top-notch security, I know…).

You will need the dataset ACLED_nigeria_cleaned.tsv loaded into HDFS at the path /input/acled_cleaned/.

It is also highly recommended that you complete the Designing a row key to store geographic events in Accumulo recipe earlier in this chapter. This recipe will use the classes AccumuloTableAssistant.java and ACLEDRowIDGenerator.java, and its parent interface RowIDGenerator.java to help with the setup.

How to do it...

Follow these steps to bulk load events into Accumulo using MapReduce:

  1. Open the Java IDE editor of your choice.
  2. Create a build template that produces a JAR file named accumulo-examples.jar.
  3. Create the package example.accumulo and add RowIDGenerator.java, AccumuloTableAssistant.java, and ACLEDRowIDGenerator.java.
  4. You will need to configure the Accumulo core and Hadoop classpath dependencies.
  5. Create the class ACLEDIngest.java with the following content:
    package examples.accumulo;
    
    import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
    import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
    import org.apache.accumulo.core.data.Key;
    import org.apache.accumulo.core.data.Value;
    import org.apache.accumulo.core.util.CachedConfiguration;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    import java.util.regex.Pattern;
    
    public class ACLEDIngest extends Configured implements Tool {
    
    
        private Configuration conf;
    
        public ACLEDIngest(Configuration conf) {
            this.conf = conf;
        }
  6. The run() method is where we create and submit the job.
        @Override
        public int run(String[] args) throws Exception {
    
            if(args.length < 8) {
                System.err.println(printUsage());
                System.exit(0);
            }
    
            Job job = new Job(conf, "ACLED ingest 
                               to Accumulo");
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapperClass(ACLEDIngestMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setReducerClass(ACLEDIngestReducer.class);
            job.setPartitionerClass(RangePartitioner.class);
            job.setJarByClass(getClass());
    
            String input = args[0];
            String outputStr = args[1];
            String instanceName = args[2];
            String tableName = args[3];
            String user = args[4];
            String pass = args[5];
            String zooQuorum = args[6];
            String localSplitFile = args[7];
    
            FileInputFormat.addInputPath(job, new Path(input));
            AccumuloFileOutputFormat.setOutputPath(job, 
                                    clearOutputDir(outputStr));
            job.setOutputFormatClass( 
                               AccumuloFileOutputFormat.class);
  7. Create the AccumuloTableAssistant instance to help create and presplit the acled table.
            AccumuloTableAssistant tableAssistant = new 
                         AccumuloTableAssistant.Builder()
             .setInstanceName(instanceName)
             .setTableName(tableName).setUser(user)
             .setPassword(pass)
             .setZooQuorum(zooQuorum)
             .build();
    
            String splitFileInHDFS = "/tmp/splits.txt";
            int numSplits = 0;
            tableAssistant.createTableIfNotExists();
            if(localSplitFile != null) {
                numSplits = tableAssistant.presplitAndWriteHDFSFile(conf, localSplitFile, splitFileInHDFS);
            }
            RangePartitioner.setSplitFile(job, splitFileInHDFS);
            job.setNumReduceTasks(numSplits + 1);
    
            if(job.waitForCompletion(true)) {
                tableAssistant.loadImportDirectory(conf, outputStr);
            }
            return 0;
        }
  8. Create printUsage() and clearOutputDir() to show argument order and to automatically clear the supplied output directory.
        private String printUsage() {
            return "<input> <output> <instance_name> <tablename> +
                    "<username> <password> <zoohosts> <splits_file_path>";
        }
    
        private Path clearOutputDir(String outputStr)
                throws IOException {
            FileSystem fs = FileSystem.get(conf);
            Path path = new Path(outputStr);
            fs.delete(path, true);
            return path;
        }
  9. Create a static nested map class called ACLEDIngestMapper.java.
        public static class ACLEDIngestMapper
                extends Mapper<LongWritable, Text, Text, Text> {
    
            private Text outKey = new Text();
            private static final Pattern tabPattern = 
                                    Pattern.compile("[\t]");
            private ACLEDRowIDGenerator gen = new 
                                       ACLEDRowIDGenerator();
    
            protected void map(LongWritable key, Text value,
                               Context context) throws IOException, InterruptedException {
    
                String[] values = 
                          tabPattern.split(value.toString());
                if(values.length == 8) {
                    String [] rowKeyFields = new String[]
                   // lat,lon,timestamp
                  {values[4], values[5], values[1]}; 
    
                    outKey.set(gen.getRowID(rowKeyFields));
                    context.write(outKey, value);
                } else {
                    context.getCounter("ACLED Ingest", 
                          "malformed records").increment(1l);
                }
            }
        }
  10. Create a static nested reduce class called ACLEDIngestReducer.java.
        public static class ACLEDIngestReducer
                extends Reducer<Text, Text, Key, Value> {
    
            private Key outKey;
            private Value outValue = new Value();
            private Text cf = new Text("cf");
            private Text qual = new Text();
            private static final Pattern tabPattern = 
                                 Pattern.compile("[\t]");
    
            @Override
            protected void reduce(Text key, Iterable<Text> values,
                                  Context context) throws 
                          IOException, InterruptedException {
    
                int found = 0;
                for(Text value : values) {
                    String[] cells = 
                           tabPattern.split(value.toString());
                    if(cells.length == 8) {
                     // don't write duplicates
                        if(found < 1) { 
                        write(context,  key, cells[3],"atr");
                        write(context,  key, cells[1], "dtg");
                        write(context,  key, cells[7], "fat");
                        write(context,  key, cells[4], "lat");
                        write(context,  key, cells[0], "loc");
                        write(context,  key, cells[5], "lon");
                        write(context,  key, cells[6], "src");
                        write(context,  key, cells[2],"type");
                        } else {
                           context.getCounter("ACLED Ingest", 
                                  "duplicates").increment(1l);
                        }
                    } else {
                        context.getCounter("ACLED Ingest",
    "malformed records missing a field").increment(1l);
                    }
                    found++;
                }
            }
  11. Create the following method inside the reduce class to help output key-value pairs:
            private void write(Context context, Text key,
                                  String cell, String qualStr)
                    throws IOException, InterruptedException {
                if(!cell.toUpperCase().equals("NULL")) {
                    qual.set(qualStr);
                    outKey = new Key(key, cf, qual, 
                                  System.currentTimeMillis());
                    outValue.set(cell.getBytes());
                    context.write(outKey, outValue);
                }
            }
    
        }
    
        @Override
        public void setConf(Configuration conf) {
            this.conf = conf;
        }
    
        @Override
        public Configuration getConf() {
            return conf;
        }
  12. Add the main class to submit an instance of your job to the ToolRunner class.
        public static void main(String[] args) throws Exception {
            Configuration conf = 
                                CachedConfiguration.getInstance();
            args = new GenericOptionsParser(conf, 
                                         args).getRemainingArgs();
            ToolRunner.run(new ACLEDIngest(conf), args);
        }
    }
  13. Save the code and build accumulo-examples.jar to the base working directory.
  14. Create a file named splits.txt in the base working folder with the following strings: 00, 01, 10, 11; each on their own line in the file.
  15. Create a launcher shell script named bulk_ingest.sh in the base working folder that has the following contents:
    tool.sh accumulo_examples.jar examples.accumulo.ACLEDIngest
     /input/acled_cleaned/
     /output/accumulo_acled_load/
     test
     acled
     root
     password
     localhost:2181
     splits.txt
  16. Run the script. You should see the job execute in the MapReduce WebUI. Upon completion, the ACLED data should be available for scan under the table acled in Accumulo.

How it works...

The program takes in eight arguments, each of which is very important. The input location is where MapReduce will find the ACLED data. The output folder is where it will output data in Accumulo's native RFile format. The string test is our testing Accumulo instance name stored in Zookeeper. The string acled is our desired table name in Accumulo. We authenticate with the Accumulo instance using the strings root:password. For this execution, we supplied one Zookeeper host on localhost:2181. Finally, splits.txt is used to help presplit our newly created acled table.

The program clears any previous folder located in our output location. We configure the AccumuloFileOutputFormat to write to this location. For this job, the mapper will output the type Text for both the key and the value.

AccumuloTableAssistant utilizes the Builder pattern to chain setter calls for object instantiation and helps avoid accidentally misplacing the arguments during construction time. We'll create the table acled if it does not exist, and will use the assistant to presplit the table based on our locally supplied splits.txt file. Without presplitting the table at creation time, the RangePartitioner class would force all of the intermediate key-value pairs to a single reducer. It is much more efficient to create presplit tablets based on expected row-key distribution and to allow multiple reducers to build RFiles in parallel. We set the number of reducers to the number of entries in our splits.txt file plus 1 to handle keys that fall above our highest split point (11). Finally, we are ready to submit the job and to examine the map and reduce phases.

Each map task JVM creates an internal instance of ACLEDRowIDGenerator. See the Designing a row key to store geographic events in Accumulo recipe in this chapter for an in-depth explanation of how this class works. Our data is tab-delimited and follows a very strict column ordering, thus we can hand-pick the column indices to read the values for lat, lon, and dtg in that respective order. The key generator needs these fields to make a valid composite geospatial and reverse timestamp rowID. We output the generated row key with the text value that was read for the line. This produces a distinct intermediate key for every unique rowID we wish to insert into Accumulo.

The reducer is responsible for taking our generated rowID and reading through any other delimited lines that output an equivalent rowID. The rowID generator in the map phase builds unique rowIDs based on the composite of lat, lon, and dtg. By definition, an ACLED event that took place in the exact same lat/lon with the same reverse timestamp, would be grouped to the same intermediate key for the reducer. However, having multiple ACLED events with the exact same rowID means that we have duplicate entries we wish to ignore. Therefore, we only want to preserve the first value collected in the Iterable object. This job does not do any duplicate merging. We use a counter to keep track of duplicate occurrences, as well as invalid lines that don't split properly. Since we are directly writing instances of Key/Value as RFiles, Accumulo requires key/value objects to be inserted in a sorted order. The rowIDs are naturally the same for each qualifier, and the column family is a static label cf, but it's very important that we maintain lexicographical ordering while considering the write order for our qualifier labels. Fortunately, our data is very predictable, and we hardcode the column value reads based on the alphabetic ordering of our qualifier labels.

Once the job is finished, and we have all of the RFiles for the presplit tablets, we use the assistant instance to read all of the files produced to the output directly and place them in the appropriate tablet. The data is immediately available for querying in the acled table in Accumulo.

There's more...

Here is a bit more explanation on some of the design choices you see in this recipe:

AccumuloTableAssistant.java

This class is designed for re-use across different Accumulo data loading and management applications. Since it requires five input strings for operation, the Builder pattern was an obvious choice to prevent accidental variable assignment constructions. See Effective Java 2.0 by Joshua Block for more detail on the Builder design pattern.

Split points

The choice of 00, 01, 10, and 11 as split points was entirely arbitrary. It was more to emphasize the importance of presplitting Accumulo tables during creation. Choosing the right split points really depends on the distribution of your rowID ranges. Too few split points and job throughput will bottleneck at the reduce stage. Too many, and you may start to waste resources and spin up time on underutilized reduce task JVMs.

AccumuloOutputFormat versus AccumuloFileOutputFormat

If you need to ingest data at a massive volume, AccumuloFileOutputFormat is the obvious choice. Producing RFiles for direct insert into tablets is not subject to the AccumuloOutputFormat overhead of writing mutations directly to the Accumulo table. On the other hand, if your MapReduce job is not write-intensive, it can be easier to work directly with Mutation instances instead of RFiles. Moreover, if your job does not require reduction and is map-only, AccumuloOutputFormat and writing direct mutations would be a much simpler design choice.

See also

  • Designing a row key to store geographic events in Accumulo
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.188.138