Using map-reduce

Map-reduce is a model for processing large sets of data in a parallel, distributed manner. This model consists of a map method for filtering and sorting data, and a reduce method for summarizing data. The map-reduce framework is effective because it distributes the processing of a dataset across multiple servers, performing mapping and reduction simultaneously on smaller pieces of the data. Map-reduce provides significant performance improvements when implemented in a multi-threaded manner. In this section, we will demonstrate a technique using Apache's Hadoop implementation. In the Using Java 8 to perform map-reduce section, we will discuss techniques for performing map-reduce using Java 8 streams.

Hadoop is a software ecosystem providing support for parallel computing. Map-reduce jobs can be run on Hadoop servers, generally set up as clusters, to significantly improve processing speeds. Hadoop has trackers that run map-reduce operations on nodes within a Hadoop cluster. Each node operates independently and the trackers monitor the progress and integrate the output of each node to generate the final output. The following image can be found at http://www.developer.com/java/data/big-data-tool-map-reduce.html and demonstrates the basic map-reduce model with trackers.

Using Apache's Hadoop to perform map-reduce

We are going to show you a very simple example of a map-reduce application here. Before we can use Hadoop, we need to download and extract Hadoop application files. The latest versions can be found at http://hadoop.apache.org/releases.html. We are using version 2.7.3 for this demonstration.

You will need to set your JAVA_HOME environment variable. Additionally, Hadoop is intolerant of long file paths and spaces within paths, so make sure you extract Hadoop to the simplest directory structure possible.

We will be working with a sample text file containing information about books. Each line of our tab-delimited file has the book title, author, and page count:

Moby Dick  Herman Melville  822

Charlotte's Web  E.B. White  189
The Grapes of Wrath  John Steinbeck  212
Jane Eyre  Charlotte Bronte  299
A Tale of Two Cities  Charles Dickens  673
War and Peace  Leo Tolstoy  1032
The Great Gatsby  F. Scott Fitzgerald  275

We are going to use a map function to extract the title and page count information and then a reduce function to calculate the average page count of the books in our dataset. To begin, create a new class, AveragePageCount. We will create two static classes within AveragePageCount, one to handle the map procedure and one to handle the reduction.

Writing the map method

First, we will create the TextMapper class, which will implement the map method. This class inherits from the Mapper class and has two private instance variables, pages and bookTitle. pages is an IntWritable object and bookTitle is a Text object. IntWritable and Text are used because these objects will need to be serialized to a byte stream before they can be transmitted to the servers for processing. These objects take up less space and transfer faster than the comparable int or String objects:

public static class TextMapper 
        extends Mapper<Object, Text, Text, IntWritable> { 
 
    private final IntWritable pages = new IntWritable(); 
    private final Text bookTitle = new Text(); 
 
} 

Within our TextMapper class we create the map method. This method takes three parameters: the key object, a Text object, bookInfo, and the Context. The key allows the tracker to map each particular object back to the correct job. The bookInfo object contains the text or string data about each book. Context holds information about the entire system and allows the method to report on progress and update values within the system.

Within the map method, we use the split method to break each piece of book information into an array of String objects. We set our bookTitle variable to position 0 of the array and set pages to the value stored in position 2, after parsing it as an integer. We can then write out our book title and page count information through the context and update our entire system:

public void map(Object key, Text bookInfo, Context context) 
 throws IOException, InterruptedException { 
        String[] book = bookInfo.toString().split("	"); 
        bookTitle.set(book[0]); 
        pages.set(Integer.parseInt(book[2])); 
        context.write(bookTitle, pages); 
    } 

Writing the reduce method

Next, we will write our AverageReduce class. This class extends the Reducer class and will perform the reduction processes to calculate our average page count. We have created four variables for this class: a FloatWritable object to store our average page count, a float average to hold our temporary average, a float count to count how many books exist in our dataset, and an integer sum to add up the page counts:

public static class AverageReduce 
        extends Reducer<Text, IntWritable, Text, FloatWritable> { 
 
    private final FloatWritable finalAvg = new FloatWritable(); 
    Float average = 0f; 
    Float count = 0f; 
    int sum = 0; 
 
} 

Within our AverageReduce class we will create the reduce method. This method takes as input a Text key, an Iterable object holding writeable integers representing the page counts, and the Context. We use our iterator to process the page counts and add each to our sum. We then calculate the average and set the value of finalAvg. This information is paired with a Text object label and written to the Context:

    public void reduce(Text key, Iterable<IntWritable> pageCnts, 
        Context context)  
            throws IOException, InterruptedException { 
 
    for (IntWritable cnt : pageCnts) { 
        sum += cnt.get(); 
    } 
    count += 1; 
    average = sum / count; 
    finalAvg.set(average); 
    context.write(new Text("Average Page Count = "), finalAvg); 
} 

Creating and executing a new Hadoop job

We are now ready to create our main method in the same class and execute our map-reduce processes. To do this, we need to create a new Configuration object and a new Job. We then set up the significant classes to use in our application.

public static void main(String[] args) throws Exception { 
    Configuration con = new Configuration(); 
    Job bookJob = Job.getInstance(con, "Average Page Count"); 
    ... 
} 

We set our main class, AveragePageCount, in the setJarByClass method. We specify our TextMapper and AverageReduce classes using the setMapperClass and setReducerClass methods, respectively. We also specify that our output will have a text-based key and a writeable integer using the setOutputKeyClass and setOutputValueClass methods:

    bookJob.setJarByClass(AveragePageCount.class); 
    bookJob.setMapperClass(TextMapper.class); 
    bookJob.setReducerClass(AverageReduce.class); 
    bookJob.setOutputKeyClass(Text.class); 
    bookJob.setOutputValueClass(IntWritable.class); 

Finally, we create new input and output paths using the addInputPath and setOutputPath methods. These methods both take our Job object as the first parameter and a Path object representing our input and output file locations as the second parameter. We then call waitForCompletion. Our application exits once this call returns true:

    FileInputFormat.addInputPath(bookJob, new Path("C:/Hadoop/books.txt")); 
    FileOutputFormat.setOutputPath(bookJob, new 
        Path("C:/Hadoop/BookOutput")); 
    if (bookJob.waitForCompletion(true)) { 
        System.exit(0); 
    } 

To execute the application, open a command prompt and navigate to the directory containing our AveragePageCount.class file. We then use the following command to execute our sample application:

hadoop AveragePageCount 

While our task is running, we see updated information about our process output to the screen. A sample of our output is shown as follows:

... 
File System Counters
    FILE: Number of bytes read=1132
    FILE: Number of bytes written=569686
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=7
    Map output records=7
    Map output bytes=136
    Map output materialized bytes=156
    Input split bytes=90
    Combine input records=0
    Combine output records=0
    Reduce input groups=7            
    Reduce shuffle bytes=156           
    Reduce input records=7           
    Reduce output records=7            
    Spilled Records=14            
    Shuffled Maps =1            
    Failed Shuffles=0            
    Merged Map outputs=1            
    GC time elapsed (ms)=11            
    Total committed heap usage (bytes)=536870912            
Shuffle Errors
    BAD_ID=0            
    CONNECTION=0           
    IO_ERROR=0           
    WRONG_LENGTH=0            
    WRONG_MAP=0            
    WRONG_REDUCE=0            
File Input Format Counters
    Bytes Read=249            
File Output Format Counters
     Bytes Written=216           

If we open the BookOutput directory created on our local machine, we find four new files. Use a text editor to open part-r-00000. This file contains information about the average page count as it was calculated using parallel processes. A sample of this output follows:

Average Page Count =  673.0
Average Page Count =   431.0
Average Page Count =   387.0
Average Page Count =   495.75
Average Page Count =   439.0
Average Page Count =   411.66666
Average Page Count =   500.2857

Notice how the average changes as each individual process is combined with the other reduction processes. This has the same effect as calculating the average of the first two books first, then adding in the third book, then the fourth, and so on. The advantage here of course is that the averaging is done in a parallel manner. If we had a huge dataset, we should expect to see a noticeable advantage in execution time. The last line of BookOutput reflects the correct and final average of all seven page counts.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.187.233