Chapter 7. MapReduce API

One advantage of Accumulo’s integration with Hadoop is that MapReduce jobs can be made to read input from Accumulo tables and also to write results to Accumulo tables. This can be done for ingesting a large amount of data quickly, for analyzing data in Accumulo tables, or for outputting data from Accumulo tables to HDFS.

Formats

Accumulo provides MapReduce input and output formats that read from Accumulo and write to Accumulo directly. There are input and output formats for both MapReduce APIs: org.apache.hadoop.mapred and org.apache.hadoop.mapreduce.

A MapReduce job can read input from an Accumulo table, write output to an Accumulo table, or both.

To configure a MapReduce job to read input from an Accumulo table, use code similar to the following:

job.setInputFormatClass(AccumuloInputFormat.class);

AccumuloInputFormat.setInputTableName(job, "table_name");

ClientConfiguration zkiConfig = new ClientConfiguration()
            .withInstance("myInstance")
            .withZkHosts("zoo1:2181,zoo2:2181");

AccumuloInputFormat.setZooKeeperInstance(job, zkiConfig);
AccumuloInputFormat.setConnectorInfo(job, "username",
    new PasswordToken("password"));

List<Pair<Text,Text>> columns = new ArrayList<>();
columns.add(new Pair(new Text("colFam"), new Text("colQual")));
AccumuloInputFormat.fetchColumns(job, columns); // optional

List<Ranges> ranges = new ArrayList<Range>();
ranges.add(new Range("a", "k"));
AccumuloInputFormat.setRanges(job, ranges); // optional

AccumuloInputFormat.setScanIsolation(job, true); // optional

AccumuloInputFormat.setScanAuthorizations(job, auths); // optional

The AccumuloInputFormat class takes care of configuring Scanner objects within map workers to deliver the key-value pairs specified in the options.

Internally, each Mapper has a Scanner over a particular range, which provides key-value pairs to the map function. Accumulo will assign each tablet as an InputSplit to a map worker. In addition, Accumulo tries to assign a tablet to a map worker that is running on the same machine that is currently hosting the tablet. This tends to provide the kind of physical data locality that map workers expect for efficient processing.

This behavior can be disabled via the InputFormatBase.setAutoAdjustRanges() method, in which case the MapReduce job will assign one map worker to each Range configured on the input format. If these ranges span tablets, a map worker will end up reading information from more than one tablet, which makes it harder to assign map tasks to machines that have a local copy of tablet data:

InputFormatBase.setAutoAdjustRanges(job, false);

To configure a MapReduce job to output data to an Accumulo table, use the AccumuloOutputFormat class:

job.setOutputFormatClass(AccumuloOutputFormat.class);

ClientConfiguration zkiConfig = new ClientConfiguration()
            .withInstance("myInstance")
            .withZkHosts("zoo1:2181,zoo2:2181");

AccumuloOutputFormat.setZooKeeperInstance(job, zkiConfig);
AccumuloOutputFormat.setConnectorInfo(job, "username",
    new PasswordToken("password"));

BatchWriterConfig config = new BatchWriterConfig();

AccumuloOutputFormat.setBatchWriterOptions(job, config);
AccumuloOutputFormat.setDefaultTableName(job, "table_name");
AccumuloOutputFormat.setCreateTables(job, true); //optional 1
1

setCreateTables() tells Accumulo whether or not to create any output tables that may not exist.

Writing Worker Classes

Mappers over Accumulo tables receive a Key object and a Value object for each map() call:

public static class WordCountMapper extends Mapper<Key,Value,K2,V2> {

  @Override
  public void map(Key k, Value v, Context context) {

  }
}

Accumulo’s InputFormatBase can be extended to provide arbitrary objects of type K,V to a mapper, where K,V can be derived from any number of Key, Value pairs.

MapReduce jobs that write to Accumulo tables emit a Text object and a Mutation object. When a job writes to just one table, the Text object can be omitted and null passed instead:

public static class WordCountReducer extends Reducer<K,V,Text,Mutation> {

  @Override
  public void reduce(K k, Iterable<V> values, Context context) {
    // process input

    Mutation m = new Mutation(row);
    m.put(colFam, colQual, value);

    context.write(null, m);

  }
}

Each Reducer has a BatchWriter that sends data to Accumulo via Text (table name), Mutation pairs.

MapReduce Example

We’ll run the ubiquitous Word Count example over our Wikipedia articles.

First we’ll create our mapper, combiner, and reducer worker classes, starting with the mapper. Our mapper will read the value of the contents column from our original WikipediaArticles table and break the article text up into individual words, counting the appearance of each word within the document along the way:

public static class WordCountMapper extends Mapper<Key,Value,Text,IntWritable> {

  @Override
  public void map(Key k, Value v, Context context) throws IOException,
      InterruptedException {

    String text = new String(v.get());

    // count words in article
    HashMap<String, Integer> wordCounts = new HashMap<>();
    for (String word :
        text.replaceAll("[^a-zA-Z ]", " ").toLowerCase().split("\s+")) {
      if (!wordCounts.containsKey(word)) {
        wordCounts.put(word, 0);
      }
      wordCounts.put(word, wordCounts.get(word) + 1);
    }

    for (Map.Entry<String, Integer> e : wordCounts.entrySet()) {
      context.write(new Text(e.getKey()), new IntWritable(e.getValue()));
    }
  }
}

Next, we’ll apply a combiner that will sum over the words seen in the documents processed by an individual map worker. This cuts down on the number of key-value pairs that have to be shuffled, sorted, and read by reduce workers. Specifically, this combiner takes a word and a set of partial sums and produces the word and one partial sum:

public static class WordCountCombiner
    extends Reducer<Text,IntWritable,Text,IntWritable> {

  @Override
  public void reduce(Text k, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for(IntWritable v : values) {
      sum += v.get();
    }

    context.write(k, new IntWritable(sum));
  }
}

Finally, our reducer will take all the partial sums from all the map workers and calculate the final count for each word. We will emit a single mutation, which will be written to the output table by AccumuloOutputFormat using an internal BatchWriter. We’ll store the final count as a String representation of an integer in our output table:

public static class WordCountReducer
    extends Reducer<Text,IntWritable,Text,Mutation> {

  @Override
  public void reduce(Text k, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for(IntWritable v : values) {
      sum += v.get();
    }

    Mutation m = new Mutation(k.toString());
    m.put("count", "", Integer.toString(sum));

    context.write(null, m);
  }
}

Now we need to make a driver to configure and run our job. For this job, this will consist of setting up the worker classes, and configuring AccumuloInputFormat and AccumuloOutputFormat:

@Override
public int run(String[] args) throws Exception {

  Job job = Job.getInstance(new Configuration());
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setMapperClass(WordCountMapper.class);
  job.setCombinerClass(WordCountCombiner.class);
  job.setReducerClass(WordCountReducer.class);

  // input
  job.setInputFormatClass(AccumuloInputFormat.class);

  ClientConfiguration zkiConfig = new ClientConfiguration()
          .withInstance(args[0])
          .withZkHosts(args[1]);

  AccumuloInputFormat.setInputTableName(job, WikipediaConstants.ARTICLES_TABLE);
  List<Pair<Text,Text>> columns = new ArrayList<>();
  columns.add(new Pair(WikipediaConstants.CONTENTS_FAMILY_TEXT, new Text("")));

  AccumuloInputFormat.fetchColumns(job, columns);
  AccumuloInputFormat.setZooKeeperInstance(job, zkiConfig);
  AccumuloInputFormat.setConnectorInfo(job, args[2], new PasswordToken(args[3]));

  // output
  job.setOutputFormatClass(AccumuloOutputFormat.class);

  BatchWriterConfig config = new BatchWriterConfig();

  AccumuloOutputFormat.setBatchWriterOptions(job, config);
  AccumuloOutputFormat.setZooKeeperInstance(job, zkiConfig);
  AccumuloOutputFormat.setConnectorInfo(job, args[2],
      new PasswordToken(args[3]));
  AccumuloOutputFormat.setDefaultTableName(job,
      WikipediaConstants.WORD_COUNT_TABLE);
  AccumuloOutputFormat.setCreateTables(job, true);

  job.setJarByClass(WordCount.class);

  job.submit();
  return 0;
}

We can run this from within our IDE, or by packaging this up as a JAR and submitting via the mapred command:

mapred -jar wordCount.jar

When the job is done we can examine the word counts in the shell:

root@miniInstance> table WikipediaWordCount
table WikipediaWordCount
root@miniInstance WikipediaWordCount> scan -b accumulo
scan -b accumulo
accumulo count: []    20
achieve count: []    1
achieved count: []    1
achieves count: []    1

MapReduce over Underlying RFiles

Typically, Accumulo uses HDFS to store all data that’s stored in tables. The format of these files is RFile, described in “File formats”.

By design, Accumulo’s files are immutable, meaning their contents cannot be changed. Writing new data and combining old files is done by creating new files. This makes it possible to easily process a consistent snapshot of a table by reading the underlying RFiles.

MapReduce jobs can be run over a set of RFiles for a table. Doing MapReduce in this way not only provides a consistent view of a table, which could also be done by reading over a clone of a table, but it also allows the MapReduce job to avoid using resources of tablet servers by reading directly from data nodes. The jobs can be more efficient for that reason.

To run MapReduce over a set of RFiles for a table, typically users will clone the table beforehand and take the cloned table offline. This will keep the set of RFiles static throughout the time the MapReduce job is running.

The API for cloning a table and taking it offline is as follows:

TableOperations ops = conn.tableOperations();

boolean flush = true;
Map<String,String> propertiesToSet = Collections.EMPTY_MAP;
Set<String> propertiesToExclude = Collections.EMPTY_SET;

ops.clone(originalTable, cloneTable, flush, propertiesToSet,
    propertiesToExclude);

When we configure the MapReduce job, we simply use the setOfflineTableScan() method when configuring our AccumuloInputFormat:

AccumuloInputFormat.setOfflineTableScan(job, true);

Example of Running a MapReduce Job over RFiles

We’ll run through an example of running a MapReduce job over RFiles using the WordCount class from our previous example.

Our job setup code is almost identical to the previous example, but this time we’ll clone our articles table first, take it offline, then configure our job to use the cloned table’s underlying RFiles:

// clone the articles table
ZooKeeperInstance inst = new ZooKeeperInstance(args[0], args[1]);
Connector conn = inst.getConnector(args[2], new PasswordToken(args[3]));

conn.tableOperations().clone(
  WikipediaConstants.ARTICLES_TABLE,
  WikipediaConstants.ARTICLES_TABLE_CLONE,
  true,
  Collections.EMPTY_MAP,
  Collections.EMPTY_SET);

// take cloned table offline, waiting until the operation is complete
boolean wait = true;
conn.tableOperations().offline(WikipediaConstants.ARTICLES_TABLE_CLONE, wait);

ClientConfiguration zkiConfig = new ClientConfiguration()
  .withInstance(args[0])
  .withZkHosts(args[1]);

// input
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setInputTableName(job,
    WikipediaConstants.ARTICLES_TABLE_CLONE);
List<Pair<Text,Text>> columns = new ArrayList<>();
columns.add(new Pair(WikipediaConstants.CONTENTS_FAMILY_TEXT, new Text("")));

AccumuloInputFormat.fetchColumns(job, columns);
AccumuloInputFormat.setZooKeeperInstance(job, zkiConfig);
AccumuloInputFormat.setConnectorInfo(job, args[2], new PasswordToken(args[3]));

// configure to use underlying RFiles
AccumuloInputFormat.setOfflineTableScan(job, true);

We run this job as we did our previous example, either from within the IDE, or by building a JAR and using the mapred command:

mapred jar mapReduceFilesExample.jar

Delivering Rows to Map Workers

In our previous examples, it was only necessary for us to receive one key-value pair in each map task. It may be necessary for each call to the map method to receive a row containing multiple columns instead. To configure a MapReduce job to deliver rows to the map method we could set the WholeRowIterator on our AccumuloInputFormat and then decode each row into multiple key-value pairs inside our map function definition, but there is another input format we can use that will do this work for us.

AccumuloRowInputFormat will deliver a row ID as the key to a mapper, and a PeekingIterator<Entry<Key,Value>> as the value. The peeking iterator will contain the key-value pairs within this row, in sorted order.

Our mapper can then process individual columns within a row like this:

public void map(Text rowID, PeekingIterator<Entry<Key,Value>> value,
    Context context) {
  Entry<Key,Value> entry = value.next();
  // process this column

  entry = value.next();
  // process this column, etc
}

Ingesters and Combiners as MapReduce Computations

The MapReduce programming model is designed for batch computation rather than incremental computation. For example, when calculating word counts over a set of 10,000 documents, a MapReduce job would read all the documents and calculate how many times each word appears. If we then add a single new document to the corpus, we either must read in all the original 10,000 documents again along with the new document, or read all the previous word counts and add the counts from the one new document to the existing counts (Figure 7-1).

Updating word count results
Figure 7-1. Updating word count results

Either option is a lot of work to add just one document.

As a result, incrementally updating a result set such as this in an efficient way tends to be done by waiting until there are a substantial number of new documents before updating the result set, the cost of which is that the result set is not updated very often.

In contrast, Accumulo’s combiners can be used to incrementally update a result set much more efficiently. In MapReduce, you can specify a combiner class that will be used to combine together intermediate output from the map phase before it is sent to the reduce phase. You can think of Accumulo’s combiners as performing a similar function.

In the word count example, the MapReduce job maps over documents and outputs word,1 for each word in the document. A combiner sums up the word counts for each mapper and sends those intermediate counts to a reducer, which tallies the final counts. In this simplest MapReduce use case, the same class is used for the reducer and the combiner. To perform a word count in Accumulo, you can configure a LongCombiner on the table and insert entries with row word and value 1 (Figure 7-2). After the data is written into Accumulo, the computation is complete.

Updating word count results incrementally
Figure 7-2. Updating word count results incrementally

An example of configuring a table this way is as follows:

IteratorSetting iterSet = new IteratorSetting(
  10,
  "summingCombiner",
  org.apache.accumulo.core.iterators.user.SummingCombiner.class.getName());

SummingCombiner.setEncodingType(iterSet, SummingCombiner.Type.LONG);

List<IteratorSetting.Column> columns = new ArrayList<>();
columns.add(new IteratorSetting.Column(new Text("colFam"), new Text("colQual")));
SummingCombiner.setColumns(iterSet, columns);

// or instead, to apply combiner to all columns
// SummingCombiner.setCombineAllColumns(iterSet, true);

conn.tableOperations().attachIterator("table_name", iterSet);

In the class WordCountIngester we can perform the work our previous WordCountMapper performed:

String wikitext = article.getText();
String plaintext = model.render(converter, wikitext)
    .replace("{{", " ")
    .replace("}}", " ");

// count words in article
HashMap<String, Integer> wordCounts = new HashMap<>();
for(String word :
    plaintext.replaceAll("^[a-zA-Z]"," ").toLowerCase().split("\s+")) {
  if(!wordCounts.containsKey(word)) {
    wordCounts.put(word, 0);
  }

  wordCounts.put(word, wordCounts.get(word) + 1);
}

try {
  for (Map.Entry<String, Integer> e : wordCounts.entrySet()) {
    Mutation m = new Mutation(e.getKey());
    m.put("counts", "", e.getValue().toString());

    batchWriter.addMutation(m);
  }
} catch (MutationsRejectedException e) {
  e.printStackTrace();
}

The SummingCombiner will perform the final reduce function for us. We set up the table as follows:

if (!conn.tableOperations().exists(WikipediaConstants.WORD_COUNT_TABLE)) {
  conn.tableOperations().create(WikipediaConstants.WORD_COUNT_TABLE);

  // configure combiner
  IteratorSetting iterSet = new IteratorSetting(
    10,
    "summingCombiner",
    org.apache.accumulo.core.iterators.user.SummingCombiner.class.getName());

  SummingCombiner.setEncodingType(iterSet, SummingCombiner.Type.STRING);

  List<IteratorSetting.Column> columns = new ArrayList<>();
  columns.add(new IteratorSetting.Column(new Text("counts"), new Text("")));
  SummingCombiner.setColumns(iterSet, columns);

  conn.tableOperations().attachIterator(WikipediaConstants.WORD_COUNT_TABLE,
      iterSet);
}

The final results of a reduce computation that assumes it has seen all the values for a particular key would typically be performed by a scan-time iterator and are not persisted in the table. An example of a computation that might be performed at scan time is the final divide in a running average.

MapReduce and Bulk Import

In some cases, rather than writing data to Accumulo incrementally, an application will want to provide a set of new files to Accumulo all at once. A MapReduce output format, the AccumuloFileOutputFormat, is provided for creating a set of files in the RFile format for bulk import into Accumulo. See “File formats” for details on the RFile format.

The most efficient way to create these RFiles is for them to each contain one continuous range of key-value pairs that doesn’t overlap with any other RFile’s key-value pairs. This is so that when these files are introduced to existing tablets in an Accumulo table, only one or maybe two tablets will require data in each RFile. Using the RangePartitioner is important to ensuring this property of the output RFiles.

To configure a job to use the RangePartitioner:

job.setPartitionerClass(RangePartitioner.class);
RangePartitioner.setSplitFile(job, "/jobconfig/splitsFile.txt"); 1
1

The splits file should be a file in HDFS that contains one Base64-encoded split point per line.

Each Reducer will create a separate RFile, and data must be output from the reduce method in sorted order. For example, a Reducer take the following form:

public static class ReduceClass extends Reducer<Text,Text,Key,Value> {

  public void reduce(Text key, Iterable<Text> values, Context output) 1
    throws IOException, InterruptedException {

    for (Text value : values) {
      // create outputKey and outputValue
      output.write(outputKey, outputValue);
    }
  }
}
1

We’re not emitting a Text and Mutation object, as is done with the AccumuloOutputFormat, but rather, Key and Value objects.

If the for loop does not create output keys in sorted order, you can instead insert the Key, Value pairs into a TreeMap in the for loop, and then iterate over the TreeMap to do the output writes at the end of the reduce method.

Once our job is finished we can import the RFiles via the importDirectory() method:

boolean setTimestamps = true;
importDirectory("table_name", "/inputFiles", "/failedFiles", setTimestamps);

This will move the files into directories associated with the table specified and introduce them to existing tablets.

See “Bulk-loading files from a MapReduce job” for details on using the Accumulo shell to bulk-load files created from MapReduce jobs.

Bulk Ingest to Avoid Duplicates

Another reason to use bulk import is to avoid writing duplicate entries into Accumulo tables when a large number of clients are used to write data. The more clients involved in writing data, the higher the chance that one can fail. If clients are simply writing data to Accumulo in response to individual user write requests, this may not be much of a problem. Applications can use conventional load balancers to find a live client and write their data.

However, in a scenario in which clients are writing information from a set of files, for example, the loss of a client makes it likely that only a portion of a file was ingested. If another client is directed to reingest the file, there is a chance that it will create duplicate entries in the table.

One way to avoid this is to make the key-value pairs written for each piece of input data deterministic. That is to say, each input record is converted into the same set of key-value pairs no matter when or which client is ingesting the record. This can still result in the same key-value pair getting written more than once, but the VersioningIterator can be configured to ignore all but the latest version of a key-value pair, effectively eliminating duplicates.

Sometimes creating deterministic key-value pairs is not an option. For example, an application may want to create key-value pairs for an input record that use the timestamp of when the data was ingested as part of the row ID. This would allow data to be read from Accumulo roughly in the order in which it arrived. For more discussion on storing data in time order, see Chapter 9.

In this case, reloading some input records from a partially processed input file would result in duplicate records with different row IDs. Using MapReduce and bulk loading would avoid loading in any key-value pairs from a file that was partially processed when the machine processing it suffered a failure. This can also allow for loading some set of key-value pairs all together as an atomic unit as each RFile is either completed and loaded or discarded, so that another worker can produce a complete file.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.132.142