One advantage of Accumulo’s integration with Hadoop is that MapReduce jobs can be made to read input from Accumulo tables and also to write results to Accumulo tables. This can be done for ingesting a large amount of data quickly, for analyzing data in Accumulo tables, or for outputting data from Accumulo tables to HDFS.
Accumulo provides MapReduce input and output formats that read from Accumulo and write to Accumulo directly.
There are input and output formats for both MapReduce APIs: org.apache.hadoop.mapred
and org.apache.hadoop.mapreduce
.
A MapReduce job can read input from an Accumulo table, write output to an Accumulo table, or both.
To configure a MapReduce job to read input from an Accumulo table, use code similar to the following:
job
.
setInputFormatClass
(
AccumuloInputFormat
.
class
);
AccumuloInputFormat
.
setInputTableName
(
job
,
"table_name"
);
ClientConfiguration
zkiConfig
=
new
ClientConfiguration
()
.
withInstance
(
"myInstance"
)
.
withZkHosts
(
"zoo1:2181,zoo2:2181"
);
AccumuloInputFormat
.
setZooKeeperInstance
(
job
,
zkiConfig
);
AccumuloInputFormat
.
setConnectorInfo
(
job
,
"username"
,
new
PasswordToken
(
"password"
));
List
<
Pair
<
Text
,
Text
>>
columns
=
new
ArrayList
<>();
columns
.
add
(
new
Pair
(
new
Text
(
"colFam"
),
new
Text
(
"colQual"
)));
AccumuloInputFormat
.
fetchColumns
(
job
,
columns
);
// optional
List
<
Ranges
>
ranges
=
new
ArrayList
<
Range
>();
ranges
.
add
(
new
Range
(
"a"
,
"k"
));
AccumuloInputFormat
.
setRanges
(
job
,
ranges
);
// optional
AccumuloInputFormat
.
setScanIsolation
(
job
,
true
);
// optional
AccumuloInputFormat
.
setScanAuthorizations
(
job
,
auths
);
// optional
The AccumuloInputFormat
class takes care of configuring Scanner
objects within map workers to deliver the key-value pairs specified in the options.
Internally, each Mapper
has a Scanner
over a particular range, which provides key-value pairs to the map function.
Accumulo will assign each tablet as an InputSplit
to a map worker.
In addition, Accumulo tries to assign a tablet to a map worker that is running on the same machine that is currently hosting the tablet.
This tends to provide the kind of physical data locality that map workers expect for efficient processing.
This behavior can be disabled via the InputFormatBase.setAutoAdjustRanges()
method, in which case the MapReduce job will assign one map worker to each Range
configured on the input format.
If these ranges span tablets, a map worker will end up reading information from more than one tablet, which makes it harder to assign map tasks to machines that have a local copy of tablet data:
InputFormatBase
.
setAutoAdjustRanges
(
job
,
false
);
To configure a MapReduce job to output data to an Accumulo table, use the AccumuloOutputFormat
class:
job
.
setOutputFormatClass
(
AccumuloOutputFormat
.
class
)
;
ClientConfiguration
zkiConfig
=
new
ClientConfiguration
(
)
.
withInstance
(
"myInstance"
)
.
withZkHosts
(
"zoo1:2181,zoo2:2181"
)
;
AccumuloOutputFormat
.
setZooKeeperInstance
(
job
,
zkiConfig
)
;
AccumuloOutputFormat
.
setConnectorInfo
(
job
,
"username"
,
new
PasswordToken
(
"password"
)
)
;
BatchWriterConfig
config
=
new
BatchWriterConfig
(
)
;
AccumuloOutputFormat
.
setBatchWriterOptions
(
job
,
config
)
;
AccumuloOutputFormat
.
setDefaultTableName
(
job
,
"table_name"
)
;
AccumuloOutputFormat
.
setCreateTables
(
job
,
true
)
;
//optional
Mappers over Accumulo tables receive a Key
object and a Value
object for each map()
call:
public
static
class
WordCountMapper
extends
Mapper
<
Key
,
Value
,
K2
,
V2
>
{
@Override
public
void
map
(
Key
k
,
Value
v
,
Context
context
)
{
}
}
Accumulo’s InputFormatBase
can be extended to provide arbitrary objects of type K
,V
to a mapper, where K
,V
can be derived from any number of Key
, Value
pairs.
MapReduce jobs that write to Accumulo tables emit a Text
object and a Mutation
object.
When a job writes to just one table, the Text
object can be omitted and null
passed instead:
public
static
class
WordCountReducer
extends
Reducer
<
K
,
V
,
Text
,
Mutation
>
{
@Override
public
void
reduce
(
K
k
,
Iterable
<
V
>
values
,
Context
context
)
{
// process input
Mutation
m
=
new
Mutation
(
row
);
m
.
put
(
colFam
,
colQual
,
value
);
context
.
write
(
null
,
m
);
}
}
Each Reducer
has a BatchWriter
that sends data to Accumulo via Text
(table name), Mutation
pairs.
We’ll run the ubiquitous Word Count example over our Wikipedia articles.
First we’ll create our mapper, combiner, and reducer worker classes, starting with the mapper. Our mapper will read the value of the contents column from our original WikipediaArticles table and break the article text up into individual words, counting the appearance of each word within the document along the way:
public
static
class
WordCountMapper
extends
Mapper
<
Key
,
Value
,
Text
,
IntWritable
>
{
@Override
public
void
map
(
Key
k
,
Value
v
,
Context
context
)
throws
IOException
,
InterruptedException
{
String
text
=
new
String
(
v
.
get
());
// count words in article
HashMap
<
String
,
Integer
>
wordCounts
=
new
HashMap
<>();
for
(
String
word
:
text
.
replaceAll
(
"[^a-zA-Z ]"
,
" "
).
toLowerCase
().
split
(
"\s+"
))
{
if
(!
wordCounts
.
containsKey
(
word
))
{
wordCounts
.
put
(
word
,
0
);
}
wordCounts
.
put
(
word
,
wordCounts
.
get
(
word
)
+
1
);
}
for
(
Map
.
Entry
<
String
,
Integer
>
e
:
wordCounts
.
entrySet
())
{
context
.
write
(
new
Text
(
e
.
getKey
()),
new
IntWritable
(
e
.
getValue
()));
}
}
}
Next, we’ll apply a combiner that will sum over the words seen in the documents processed by an individual map worker. This cuts down on the number of key-value pairs that have to be shuffled, sorted, and read by reduce workers. Specifically, this combiner takes a word and a set of partial sums and produces the word and one partial sum:
public
static
class
WordCountCombiner
extends
Reducer
<
Text
,
IntWritable
,
Text
,
IntWritable
>
{
@Override
public
void
reduce
(
Text
k
,
Iterable
<
IntWritable
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
int
sum
=
0
;
for
(
IntWritable
v
:
values
)
{
sum
+=
v
.
get
();
}
context
.
write
(
k
,
new
IntWritable
(
sum
));
}
}
Finally, our reducer will take all the partial sums from all the map workers and calculate the final count for each word.
We will emit a single mutation, which will be written to the output table by AccumuloOutputFormat
using an internal BatchWriter
.
We’ll store the final count as a String
representation of an integer in our output table:
public
static
class
WordCountReducer
extends
Reducer
<
Text
,
IntWritable
,
Text
,
Mutation
>
{
@Override
public
void
reduce
(
Text
k
,
Iterable
<
IntWritable
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
int
sum
=
0
;
for
(
IntWritable
v
:
values
)
{
sum
+=
v
.
get
();
}
Mutation
m
=
new
Mutation
(
k
.
toString
());
m
.
put
(
"count"
,
""
,
Integer
.
toString
(
sum
));
context
.
write
(
null
,
m
);
}
}
Now we need to make a driver to configure and run our job.
For this job, this will consist of setting up the worker classes, and configuring AccumuloInputFormat
and AccumuloOutputFormat
:
@Override
public
int
run
(
String
[]
args
)
throws
Exception
{
Job
job
=
Job
.
getInstance
(
new
Configuration
());
job
.
setOutputKeyClass
(
Text
.
class
);
job
.
setOutputValueClass
(
IntWritable
.
class
);
job
.
setMapperClass
(
WordCountMapper
.
class
);
job
.
setCombinerClass
(
WordCountCombiner
.
class
);
job
.
setReducerClass
(
WordCountReducer
.
class
);
// input
job
.
setInputFormatClass
(
AccumuloInputFormat
.
class
);
ClientConfiguration
zkiConfig
=
new
ClientConfiguration
()
.
withInstance
(
args
[
0
])
.
withZkHosts
(
args
[
1
]);
AccumuloInputFormat
.
setInputTableName
(
job
,
WikipediaConstants
.
ARTICLES_TABLE
);
List
<
Pair
<
Text
,
Text
>>
columns
=
new
ArrayList
<>();
columns
.
add
(
new
Pair
(
WikipediaConstants
.
CONTENTS_FAMILY_TEXT
,
new
Text
(
""
)));
AccumuloInputFormat
.
fetchColumns
(
job
,
columns
);
AccumuloInputFormat
.
setZooKeeperInstance
(
job
,
zkiConfig
);
AccumuloInputFormat
.
setConnectorInfo
(
job
,
args
[
2
],
new
PasswordToken
(
args
[
3
]));
// output
job
.
setOutputFormatClass
(
AccumuloOutputFormat
.
class
);
BatchWriterConfig
config
=
new
BatchWriterConfig
();
AccumuloOutputFormat
.
setBatchWriterOptions
(
job
,
config
);
AccumuloOutputFormat
.
setZooKeeperInstance
(
job
,
zkiConfig
);
AccumuloOutputFormat
.
setConnectorInfo
(
job
,
args
[
2
],
new
PasswordToken
(
args
[
3
]));
AccumuloOutputFormat
.
setDefaultTableName
(
job
,
WikipediaConstants
.
WORD_COUNT_TABLE
);
AccumuloOutputFormat
.
setCreateTables
(
job
,
true
);
job
.
setJarByClass
(
WordCount
.
class
);
job
.
submit
();
return
0
;
}
We can run this from within our IDE, or by packaging this up as a JAR and submitting via the mapred
command:
mapred -jar wordCount.jar
When the job is done we can examine the word counts in the shell:
root@miniInstance> table WikipediaWordCount table WikipediaWordCount
root@miniInstance WikipediaWordCount> scan -b accumulo scan -b accumulo accumulo count: [] 20 achieve count: [] 1 achieved count: [] 1 achieves count: [] 1
Typically, Accumulo uses HDFS to store all data that’s stored in tables. The format of these files is RFile, described in “File formats”.
By design, Accumulo’s files are immutable, meaning their contents cannot be changed. Writing new data and combining old files is done by creating new files. This makes it possible to easily process a consistent snapshot of a table by reading the underlying RFiles.
MapReduce jobs can be run over a set of RFiles for a table. Doing MapReduce in this way not only provides a consistent view of a table, which could also be done by reading over a clone of a table, but it also allows the MapReduce job to avoid using resources of tablet servers by reading directly from data nodes. The jobs can be more efficient for that reason.
To run MapReduce over a set of RFiles for a table, typically users will clone the table beforehand and take the cloned table offline. This will keep the set of RFiles static throughout the time the MapReduce job is running.
The API for cloning a table and taking it offline is as follows:
TableOperations
ops
=
conn
.
tableOperations
();
boolean
flush
=
true
;
Map
<
String
,
String
>
propertiesToSet
=
Collections
.
EMPTY_MAP
;
Set
<
String
>
propertiesToExclude
=
Collections
.
EMPTY_SET
;
ops
.
clone
(
originalTable
,
cloneTable
,
flush
,
propertiesToSet
,
propertiesToExclude
);
When we configure the MapReduce job, we simply use the setOfflineTableScan()
method when configuring our AccumuloInputFormat
:
AccumuloInputFormat
.
setOfflineTableScan
(
job
,
true
);
We’ll run through an example of running a MapReduce job over RFiles using the WordCount
class from our previous example.
Our job setup code is almost identical to the previous example, but this time we’ll clone our articles table first, take it offline, then configure our job to use the cloned table’s underlying RFiles:
// clone the articles table
ZooKeeperInstance
inst
=
new
ZooKeeperInstance
(
args
[
0
],
args
[
1
]);
Connector
conn
=
inst
.
getConnector
(
args
[
2
],
new
PasswordToken
(
args
[
3
]));
conn
.
tableOperations
().
clone
(
WikipediaConstants
.
ARTICLES_TABLE
,
WikipediaConstants
.
ARTICLES_TABLE_CLONE
,
true
,
Collections
.
EMPTY_MAP
,
Collections
.
EMPTY_SET
);
// take cloned table offline, waiting until the operation is complete
boolean
wait
=
true
;
conn
.
tableOperations
().
offline
(
WikipediaConstants
.
ARTICLES_TABLE_CLONE
,
wait
);
ClientConfiguration
zkiConfig
=
new
ClientConfiguration
()
.
withInstance
(
args
[
0
])
.
withZkHosts
(
args
[
1
]);
// input
job
.
setInputFormatClass
(
AccumuloInputFormat
.
class
);
AccumuloInputFormat
.
setInputTableName
(
job
,
WikipediaConstants
.
ARTICLES_TABLE_CLONE
);
List
<
Pair
<
Text
,
Text
>>
columns
=
new
ArrayList
<>();
columns
.
add
(
new
Pair
(
WikipediaConstants
.
CONTENTS_FAMILY_TEXT
,
new
Text
(
""
)));
AccumuloInputFormat
.
fetchColumns
(
job
,
columns
);
AccumuloInputFormat
.
setZooKeeperInstance
(
job
,
zkiConfig
);
AccumuloInputFormat
.
setConnectorInfo
(
job
,
args
[
2
],
new
PasswordToken
(
args
[
3
]));
// configure to use underlying RFiles
AccumuloInputFormat
.
setOfflineTableScan
(
job
,
true
);
We run this job as we did our previous example, either from within the IDE, or by building a JAR and using the mapred
command:
mapred jar mapReduceFilesExample.jar
In our previous examples, it was only necessary for us to receive one key-value pair in each map task.
It may be necessary for each call to the map method to receive a row containing multiple columns instead.
To configure a MapReduce job to deliver rows to the map method we could set the WholeRowIterator
on our AccumuloInputFormat
and then decode each row into multiple key-value pairs inside our map function definition, but there is another input format we can use that will do this work for us.
AccumuloRowInputFormat
will deliver a row ID as the key to a mapper, and a PeekingIterator<Entry<Key,Value>>
as the value.
The peeking iterator will contain the key-value pairs within this row, in sorted order.
Our mapper can then process individual columns within a row like this:
public
void
map
(
Text
rowID
,
PeekingIterator
<
Entry
<
Key
,
Value
>>
value
,
Context
context
)
{
Entry
<
Key
,
Value
>
entry
=
value
.
next
();
// process this column
entry
=
value
.
next
();
// process this column, etc
}
The MapReduce programming model is designed for batch computation rather than incremental computation. For example, when calculating word counts over a set of 10,000 documents, a MapReduce job would read all the documents and calculate how many times each word appears. If we then add a single new document to the corpus, we either must read in all the original 10,000 documents again along with the new document, or read all the previous word counts and add the counts from the one new document to the existing counts (Figure 7-1).
Either option is a lot of work to add just one document.
As a result, incrementally updating a result set such as this in an efficient way tends to be done by waiting until there are a substantial number of new documents before updating the result set, the cost of which is that the result set is not updated very often.
In contrast, Accumulo’s combiners can be used to incrementally update a result set much more efficiently. In MapReduce, you can specify a combiner class that will be used to combine together intermediate output from the map phase before it is sent to the reduce phase. You can think of Accumulo’s combiners as performing a similar function.
In the word count example, the MapReduce job maps over documents and outputs word,1
for each word in the document.
A combiner sums up the word counts for each mapper and sends those intermediate counts to a reducer, which tallies the final counts.
In this simplest MapReduce use case, the same class is used for the reducer and the combiner.
To perform a word count in Accumulo, you can configure a LongCombiner
on the table and insert entries with row word and value 1 (Figure 7-2).
After the data is written into Accumulo, the computation is complete.
An example of configuring a table this way is as follows:
IteratorSetting
iterSet
=
new
IteratorSetting
(
10
,
"summingCombiner"
,
org
.
apache
.
accumulo
.
core
.
iterators
.
user
.
SummingCombiner
.
class
.
getName
());
SummingCombiner
.
setEncodingType
(
iterSet
,
SummingCombiner
.
Type
.
LONG
);
List
<
IteratorSetting
.
Column
>
columns
=
new
ArrayList
<>();
columns
.
add
(
new
IteratorSetting
.
Column
(
new
Text
(
"colFam"
),
new
Text
(
"colQual"
)));
SummingCombiner
.
setColumns
(
iterSet
,
columns
);
// or instead, to apply combiner to all columns
// SummingCombiner.setCombineAllColumns(iterSet, true);
conn
.
tableOperations
().
attachIterator
(
"table_name"
,
iterSet
);
In the class WordCountIngester
we can perform the work our previous WordCountMapper
performed:
String
wikitext
=
article
.
getText
();
String
plaintext
=
model
.
render
(
converter
,
wikitext
)
.
replace
(
"{{"
,
" "
)
.
replace
(
"}}"
,
" "
);
// count words in article
HashMap
<
String
,
Integer
>
wordCounts
=
new
HashMap
<>();
for
(
String
word
:
plaintext
.
replaceAll
(
"^[a-zA-Z]"
,
" "
).
toLowerCase
().
split
(
"\s+"
))
{
if
(!
wordCounts
.
containsKey
(
word
))
{
wordCounts
.
put
(
word
,
0
);
}
wordCounts
.
put
(
word
,
wordCounts
.
get
(
word
)
+
1
);
}
try
{
for
(
Map
.
Entry
<
String
,
Integer
>
e
:
wordCounts
.
entrySet
())
{
Mutation
m
=
new
Mutation
(
e
.
getKey
());
m
.
put
(
"counts"
,
""
,
e
.
getValue
().
toString
());
batchWriter
.
addMutation
(
m
);
}
}
catch
(
MutationsRejectedException
e
)
{
e
.
printStackTrace
();
}
The SummingCombiner
will perform the final reduce function for us. We set up the table as follows:
if
(!
conn
.
tableOperations
().
exists
(
WikipediaConstants
.
WORD_COUNT_TABLE
))
{
conn
.
tableOperations
().
create
(
WikipediaConstants
.
WORD_COUNT_TABLE
);
// configure combiner
IteratorSetting
iterSet
=
new
IteratorSetting
(
10
,
"summingCombiner"
,
org
.
apache
.
accumulo
.
core
.
iterators
.
user
.
SummingCombiner
.
class
.
getName
());
SummingCombiner
.
setEncodingType
(
iterSet
,
SummingCombiner
.
Type
.
STRING
);
List
<
IteratorSetting
.
Column
>
columns
=
new
ArrayList
<>();
columns
.
add
(
new
IteratorSetting
.
Column
(
new
Text
(
"counts"
),
new
Text
(
""
)));
SummingCombiner
.
setColumns
(
iterSet
,
columns
);
conn
.
tableOperations
().
attachIterator
(
WikipediaConstants
.
WORD_COUNT_TABLE
,
iterSet
);
}
The final results of a reduce computation that assumes it has seen all the values for a particular key would typically be performed by a scan-time iterator and are not persisted in the table. An example of a computation that might be performed at scan time is the final divide in a running average.
In some cases, rather than writing data to Accumulo incrementally, an application will want to provide a set of new files to Accumulo all at once.
A MapReduce output format, the AccumuloFileOutputFormat
, is provided for creating a set of files in the RFile format for bulk import into Accumulo.
See “File formats” for details on the RFile format.
The most efficient way to create these RFiles is for them to each contain one continuous range of key-value pairs that doesn’t overlap with any other RFile’s key-value pairs.
This is so that when these files are introduced to existing tablets in an Accumulo table, only one or maybe two tablets will require data in each RFile.
Using the RangePartitioner
is important to ensuring this property of the output RFiles.
To configure a job to use the RangePartitioner
:
job
.
setPartitionerClass
(
RangePartitioner
.
class
)
;
RangePartitioner
.
setSplitFile
(
job
,
"/jobconfig/splitsFile.txt"
)
;
Each Reducer
will create a separate RFile, and data must be output from the reduce method in sorted order.
For example, a Reducer
take the following form:
public
static
class
ReduceClass
extends
Reducer
<
Text
,
Text
,
Key
,
Value
>
{
public
void
reduce
(
Text
key
,
Iterable
<
Text
>
values
,
Context
output
)
throws
IOException
,
InterruptedException
{
for
(
Text
value
:
values
)
{
// create outputKey and outputValue
output
.
write
(
outputKey
,
outputValue
)
;
}
}
}
We’re not emitting a Text
and Mutation
object, as is done with the AccumuloOutputFormat
, but rather, Key
and Value
objects.
If the for
loop does not create output keys in sorted order, you can instead insert the Key
, Value
pairs into a TreeMap
in the for
loop, and then iterate over the TreeMap
to do the output writes at the end of the reduce method.
Once our job is finished we can import the RFiles via the importDirectory()
method:
boolean
setTimestamps
=
true
;
importDirectory
(
"table_name"
,
"/inputFiles"
,
"/failedFiles"
,
setTimestamps
);
This will move the files into directories associated with the table specified and introduce them to existing tablets.
See “Bulk-loading files from a MapReduce job” for details on using the Accumulo shell to bulk-load files created from MapReduce jobs.
Another reason to use bulk import is to avoid writing duplicate entries into Accumulo tables when a large number of clients are used to write data. The more clients involved in writing data, the higher the chance that one can fail. If clients are simply writing data to Accumulo in response to individual user write requests, this may not be much of a problem. Applications can use conventional load balancers to find a live client and write their data.
However, in a scenario in which clients are writing information from a set of files, for example, the loss of a client makes it likely that only a portion of a file was ingested. If another client is directed to reingest the file, there is a chance that it will create duplicate entries in the table.
One way to avoid this is to make the key-value pairs written for each piece of input data deterministic.
That is to say, each input record is converted into the same set of key-value pairs no matter when or which client is ingesting the record.
This can still result in the same key-value pair getting written more than once, but the VersioningIterator
can be configured to ignore all but the latest version of a key-value pair, effectively eliminating duplicates.
Sometimes creating deterministic key-value pairs is not an option. For example, an application may want to create key-value pairs for an input record that use the timestamp of when the data was ingested as part of the row ID. This would allow data to be read from Accumulo roughly in the order in which it arrived. For more discussion on storing data in time order, see Chapter 9.
In this case, reloading some input records from a partially processed input file would result in duplicate records with different row IDs. Using MapReduce and bulk loading would avoid loading in any key-value pairs from a file that was partially processed when the machine processing it suffered a failure. This can also allow for loading some set of key-value pairs all together as an atomic unit as each RFile is either completed and loaded or discarded, so that another worker can produce a complete file.
18.118.132.142