Time for action – reduce-side join using MultipleInputs

We can perform the report explained in the previous section using a reduce-side join by performing the following steps:

  1. Create the following tab-separated file and name it sales.txt:
    00135.992012-03-15
    00212.492004-07-02
    00413.422005-12-20
    003499.992010-12-20
    00178.952012-04-02
    00221.992006-11-30
    00293.452008-09-10
    0019.992012-05-17
  2. Create the following tab-separated file and name it accounts.txt:
    001John AllenStandard2012-03-15
    002Abigail SmithPremium2004-07-13
    003April StevensStandard2010-12-20
    004Nasser HafezPremium2001-04-23
  3. Copy the datafiles onto HDFS.
    $ hadoop fs -mkdir sales
    $ hadoop fs -put sales.txt sales/sales.txt
    $ hadoop fs -mkdir accounts
    $ hadoop fs -put accounts/accounts.txt
    
  4. Create the following file and name it ReduceJoin.java:
    import java.io.* ;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    
    public class ReduceJoin
    {
    
        public static class SalesRecordMapper
    extends Mapper<Object, Text, Text, Text>
    {
    
            public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException
            {
                String record = value.toString() ;
                String[] parts = record.split("	") ;
    
                context.write(new Text(parts[0]), new 
    Text("sales	"+parts[1])) ;
            }
        }
    
        public static class AccountRecordMapper
    extends Mapper<Object, Text, Text, Text>
    {
            public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException
            {
                String record = value.toString() ;
                String[] parts = record.split("	") ;
    
                context.write(new Text(parts[0]), new 
    Text("accounts	"+parts[1])) ;
           }
        }
    
        public static class ReduceJoinReducer
        extends Reducer<Text, Text, Text, Text>
        {
    
            public void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException
            {
                String name = "" ;
    double total = 0.0 ;
                int count = 0 ;
    
                for(Text t: values)
                {
                    String parts[] = t.toString().split("	") ;
    
                    if (parts[0].equals("sales"))
                    {
                        count++ ;
                        total+= Float.parseFloat(parts[1]) ;
                    }
                    else if (parts[0].equals("accounts"))
                    {
                        name = parts[1] ;
                    }
                }
    
                String str = String.format("%d	%f", count, total) ;
                context.write(new Text(name), new Text(str)) ;
            }
        }
    
        public static void main(String[] args) throws Exception 
    {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "Reduce-side join");
            job.setJarByClass(ReduceJoin.class);
            job.setReducerClass(ReduceJoinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    MultipleInputs.addInputPath(job, new Path(args[0]), 
    TextInputFormat.class, SalesRecordMapper.class) ;
    MultipleInputs.addInputPath(job, new Path(args[1]), 
    TextInputFormat.class, AccountRecordMapper.class) ;
            Path outputPath = new Path(args[2]);
            FileOutputFormat.setOutputPath(job, outputPath);
    outputPath.getFileSystem(conf).delete(outputPath);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
  5. Compile the file and add it to a JAR file.
    $ javac ReduceJoin.java
    $ jar -cvf join.jar *.class
    
  6. Run the job by executing the following command:
    $ hadoop jar join.jarReduceJoin sales accounts outputs
    
  7. Examine the result file.
    $ hadoop fs -cat /user/garry/outputs/part-r-00000
    John Allen3124.929998
    Abigail Smith3127.929996
    April Stevens1499.989990
    Nasser Hafez113.420000
    

What just happened?

Firstly, we created the datafiles to be used in this example. We created two small data sets as this makes it easier to track the result output. The first data set we defined was the account details with four columns, as follows:

  • The account ID
  • The client name
  • The type of account
  • The date the account was opened

We then created a sales record with three columns:

  • The account ID of the purchaser
  • The value of the sale
  • The date of the sale

Naturally, real account and sales records would have many more fields than the ones mentioned here. After creating the files, we placed them onto HDFS.

We then created the ReduceJoin.java file, which looks very much like the previous MapReduce jobs we have used. There are a few aspects to this job that make it special and allow us to implement a join.

Firstly, the class has two defined mappers. As we have seen before, jobs can have multiple mappers executed in a chain; but in this case, we wish to apply different mappers to each of the input locations. Accordingly, we have the sales and account data defined into the SalesRecordMapper and AccountRecordMapper classes. We used the MultipleInputs class from the org.apache.hadoop.mapreduce.lib.io package as follows:

MultipleInputs.addInputPath(job, new Path(args[0]), 
TextInputFormat.class, SalesRecordMapper.class) ;
MultipleInputs.addInputPath(job, new Path(args[1]), 
TextInputFormat.class, AccountRecordMapper.class) ;

As you can see, unlike in previous examples where we add a single input location, the MultipleInputs class allows us to add multiple sources and associate each with a distinct input format and mapper.

The mappers are pretty straightforward; the SalesRecordMapper class emits an output of the form <account number>, <sales value> while the AccountRecordMapper class emits an output of the form <account number>, <client name>. We therefore have the order value and client name for each sale being passed into the reducer where the actual join will happen.

Notice that both mappers actually emit more than the required values. The SalesRecordMapper class prefixes its value output with sales while the AccountRecordMapper class uses the tag account.

If we look at the reducer, we can see why this is so. The reducer retrieves each record for a given key, but without these explicit tags we would not know if a given value came from the sales or account mapper and hence would not understand how to treat the data value.

The ReduceJoinReducer class therefore treats the values in the Iterator object differently, depending on which mapper they came from. Values from the AccountRecordMapper class—and there should be only one—are used to populate the client name in the final output. For each sales record—likely to be multiple, as most clients buy more than a single item—the total number of orders is counted as is the overall combined value. The output from the reducer is therefore a key of the account holder name and a value string containing the number of orders and the total order value.

We compile and execute the class; notice how we provide three arguments representing the two input directories as well as the single output source. Because of how the MultipleInputs class is configured, we must also ensure we specify the directories in the right order; there is no dynamic mechanism to determine which type of file is in which location.

After execution, we examine the output file and confirm that it does indeed contain the overall totals for named clients as expected.

DataJoinMapper and TaggedMapperOutput

There is a way of implementing a reduce-side join in a more sophisticated and object-oriented fashion. Within the org.apache.hadoop.contrib.join package are classes such as DataJoinMapperBase and TaggedMapOutput that provide an encapsulated means of deriving the tags for map output and having them processed at the reducer. This mechanism means you don't have to define explicit tag strings as we did previously and then carefully parse out the data received at the reducer to determine from which mapper the data came; there are methods in the provided classes that encapsulate this functionality.

This capability is particularly valuable when using numeric or other non-textual data. For creating our own explicit tags as in the previous example, we would have to convert types such as integers into strings to allow us to add the required prefix tag. This will be more inefficient than using the numeric types in their normal form and relying on the additional classes to implement the tag.

The framework allows for quite sophisticated tag generation as well as concepts such as tag grouping that we didn't implement previously. There is additional work required to use this mechanism that includes overriding additional methods and using a different map base class. For straightforward joins such as in the previous example, this framework may be overkill, but if you find yourself implementing very complex tagging logic, it may be worth a look.

Implementing map-side joins

For a join to occur at a given point, we must have access to the appropriate records from each data set at that point. This is where the simplicity of the reduce-side join comes into its own; though it incurs the expense of additional network traffic, processing it by definition ensures that the reducer has all records associated with the join key.

If we wish to perform our join in the mapper, it isn't as easy to make this condition hold true. We can't assume that our input data is sufficiently well structured to allow associated records to be read simultaneously. We generally have two classes of approach here: obviate the need to read from multiple external sources or preprocess the data so that it is amenable for map-side joining.

Using the Distributed Cache

The simplest way of realizing the first approach is to take all but one data set and make it available in the Distributed Cache that we used in the previous chapter. The approach can be used for multiple data sources, but for simplicity let's discuss just two.

If we have one large data set and one smaller one, such as with the sales and account info earlier, one option would be to package up the account info and push it into the Distributed Cache. Each mapper would then read this data into an efficient data structure, such as a hash table that uses the join key as the hash key. The sales records are then processed, and during the processing of record each the needed account information can be retrieved from the hash table.

This mechanism is very effective and when one of the smaller data sets can easily fit into memory, it is a great approach. However, we are not always that lucky, and sometimes the smallest data set is still too large to be copied to every worker machine and held in memory.

Have a go hero - Implementing map-side joins

Take the previous sales/account record example and implement a map-side join using the Distributed Cache. If you load the account records into a hash table that maps account ID numbers to client names, you can use the account ID to retrieve the client name. Do this within the mapper while processing the sales records.

Pruning data to fit in the cache

If the smallest data set is still too big to be used in the Distributed Cache, all is not necessarily lost. Our earlier example, for instance, extracted only two fields from each record and discarded the other fields not required by the job. In reality, an account will be described by many attributes, and this sort of reduction will limit the data size dramatically. Often the data available to Hadoop is this full data set, but what we need is only a subset of the fields.

In such a case, therefore, it may be possible to extract from the full data set only the fields that are needed during the MapReduce job, and in doing so create a pruned data set that is small enough to be used in the cache.

Note

This is a very similar concept to the underlying column-oriented databases . Traditional relational databases store data a row at a time, meaning that the full row needs to be read to extract a single column. A column-based database instead stores each column separately, allowing a query to read only the columns in which it is interested.

If you take this approach, you need to consider what mechanism will be used to generate the data subset and how often this will be done. The obvious approach is to write another MapReduce job that does the necessary filtering and this output is then used in the Distributed Cache for the follow-on job. If the smaller data set changes only rarely, you may be able to get away with generating the pruned data set on a scheduled basis; for example, refresh it every night. Otherwise, you will need to make a chain of two MapReduce jobs: one to produce the pruned data set and the other to perform the join operation using the large set and the data in the Distributed Cache.

Using a data representation instead of raw data

Sometimes, one of the data sources is not used to retrieve additional data but is instead used to derive some fact that is then used in a decision process. We may, for example, be looking to filter sales records to extract only those for which the shipping address was in a specific locale.

In such a case, we can reduce the required data size down to a list of the applicable sales records that may more easily fit into the cache. We can again store it as a hash table, where we are just recording the fact that the record is valid, or even use something like a sorted list or a tree. In cases where we can accept some false positives while still guaranteeing no false negatives, a Bloom filter provides an extremely compact way of representing such information.

As can be seen, applying this approach to enable a map-side join requires creativity and not a little luck in regards to the nature of the data set and the problem at hand. But remember that the best relational database administrators spend significant time optimizing queries to remove unnecessary data processing; so it's never a bad idea to ask if you truly need to process all that data.

Using multiple mappers

Fundamentally, the previous techniques are trying to remove the need for a full cross data set join. But sometimes this is what you have to do; you may simply have very large data sets that cannot be combined in any of these clever ways.

There are classes within the org.apache.hadoop.mapreduce.lib.join package that support this situation. The main class of interest is CompositeInputFormat, which applies a user-defined function to combine records from multiple data sources.

The main limitation of this approach is that the data sources must already be indexed based on the common key, in addition to being both sorted and partitioned in the same way. The reason for this is simple: when reading from each source, the framework needs to know if a given key is present at each location. If we know that each partition is sorted and contains the same key range, simple iteration logic can do the required matching.

This situation is obviously not going to happen by accident, so again you may find yourself writing preprocess jobs to transform all the input data sources into the correct sort and partition structure.

Note

This discussion starts to touch on distributed and parallel join algorithms; both topics are of extensive academic and commercial research. If you are interested in the ideas and want to learn more of the underlying theory, go searching on http://scholar.google.com.

To join or not to join...

After our tour of joins in the MapReduce world, let's come back to the original question: are you really sure you want to be doing this? The choice is often between a relatively easily implemented yet inefficient reduce-side join, and more efficient but more complex map-side alternatives. We have seen that joins can indeed be implemented in MapReduce, but they aren't always pretty. This is why we advise the use of something like Hive or Pig if these types of problems comprise a large portion of your workload. Obviously, we can use tools such as those that do their own translation into MapReduce code under the hood and directly implement both map-side and reduce-side joins, but it's often better to use a well-engineered and well-optimized library for such workloads instead of building your own. That is after all why you are using Hadoop and not writing your own distributed processing framework!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.86.211