Time for action – creating the source code

We'll now see the source code to implement our graph traversal. Because the code is lengthy, we'll break it into multiple steps; obviously they should all be together in a single source file.

  1. Create the following as GraphPath.java with these imports:
    import java.io.* ;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    
    public class GraphPath
    {
  2. Create an inner class to hold an object-oriented representation of a node:
    // Inner class to represent a node
        public static class Node
        {
    // The integer node id
            private String id ;
    // The ids of all nodes this node has a path to
            private String neighbours ;
    // The distance of this node to the starting node
            private int distance ;
    // The current node state
            private String state ;
    
    // Parse the text file representation into a Node object
            Node( Text t)
            {
                String[] parts = t.toString().split("	") ;
    this.id = parts[0] ;
    this.neighbours = parts[1] ;
                if (parts.length<3 || parts[2].equals(""))
    this.distance = -1 ;
                else
    this.distance = Integer.parseInt(parts[2]) ;
    
                if (parts.length< 4 || parts[3].equals(""))
    this.stae = "P" ;
                else
    this.state = parts[3] ;
            }
    
    // Create a node from a key and value object pair
            Node(Text key, Text value)
            {
                this(new Text(key.toString()+"	"+value.toString())) ;
            }
    
            Public String getId()
            {return this.id ;
            }
    
            public String getNeighbours()
            {
                return this.neighbours ;
            }
    
            public int getDistance()
            {
                return this.distance ;
            }
    
            public String getState()
            {
                return this.state ;
            }
        }
  3. Create the mapper for the job. The mapper will create a new Node object for its input and then examine it, and based on its state do the appropriate processing.
        public static class GraphPathMapper
    extends Mapper<Object, Text, Text, Text>
    {
    
           public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException 
    {
             Node n = new Node(value) ;
    
             if (n.getState().equals("C"))
             {
    //  Output the node with its state changed to Done
                context.write(new Text(n.getId()), new Text(n.getNeighbours()+"	"+n.getDistance()+"	"+"D")) ;
    
                    for (String neighbour:n.getNeighbours().split(","))
                    {
    // Output each neighbour as a Currently processing node
    // Increment the distance by 1; it is one link further away
                        context.write(new Text(neighbour), new 
    Text("	"+(n.getDistance()+1)+"	C")) ;
                    }
                }
                else
                {
    // Output a pending node unchanged
                    context.write(new Text(n.getId()), new 
    Text(n.getNeighbours()+"	"+n.getDistance()
    +"	"+n.getState())) ;
                }
    
            }
        }
  4. Create the reducer for the job. As with the mapper, this reads in a representation of a node and gives as output a different value depending on the state of the node. The basic approach is to collect from the input the largest value for the state and distance columns, and through this converge to the final solution.
        public static class GraphPathReducer
    extends Reducer<Text, Text, Text, Text>
    {
    
            public void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException 
    {
    // Set some default values for the final output
                String neighbours = null ;
    int distance = -1 ;
    String state = "P" ;
    
                for(Text t: values)
    {
                    Node n = new Node(key, t) ;
    
                    if (n.getState().equals("D"))
                    {
    // A done node should be the final output; ignore the remaining 
    // values
    neighbours = n.getNeighbours() ;
                        distance = n.getDistance() ;
                        state = n.getState() ;
                        break ;
                    }
    
    // Select the list of neighbours when found                
                    if (n.getNeighbours() != null)
    neighbours = n.getNeighbours() ;
    
    // Select the largest distance
                    if (n.getDistance() > distance)
    distance = n.getDistance() ;
    
    // Select the highest remaining state
                    if (n.getState().equals("D") || 
    (n.getState().equals("C") &&state.equals("P")))
    state=n.getState() ;
                }
    
    // Output a new node representation from the collected parts        
                context.write(key, new 
    Text(neighbours+"	"+distance+"	"+state)) ;
            }
        }
  5. Create the job driver:
        public static void main(String[] args) throws Exception 
    {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "graph path");
            job.setJarByClass(GraphPath.class);
            job.setMapperClass(GraphPathMapper.class);
            job.setReducerClass(GraphPathReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

What just happened?

The job here implements the previously described algorithm that we'll execute in the following sections. The job setup is pretty standard, and apart from the algorithm definition the only new thing here is the use of an inner class to represent nodes.

The input to a mapper or reducer is often a flattened representation of a more complex structure or object. We could just use that representation, but in this case this would result in the mapper and reducer bodies being full of text and string manipulation code that would obscure the actual algorithm.

The use of the Node inner class allows the mapping from the flat file to object representation that is to be encapsulated in an object that makes sense in terms of the business domain. This also makes the mapper and reducer logic clearer as comparisons between object attributes are more semantically meaningful than comparisons with slices of a string identified only by absolute index positions.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.158.36