We'll now see the source code to implement our graph traversal. Because the code is lengthy, we'll break it into multiple steps; obviously they should all be together in a single source file.
GraphPath.java
with these imports:import java.io.* ; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; public class GraphPath {
// Inner class to represent a node public static class Node { // The integer node id private String id ; // The ids of all nodes this node has a path to private String neighbours ; // The distance of this node to the starting node private int distance ; // The current node state private String state ; // Parse the text file representation into a Node object Node( Text t) { String[] parts = t.toString().split(" ") ; this.id = parts[0] ; this.neighbours = parts[1] ; if (parts.length<3 || parts[2].equals("")) this.distance = -1 ; else this.distance = Integer.parseInt(parts[2]) ; if (parts.length< 4 || parts[3].equals("")) this.stae = "P" ; else this.state = parts[3] ; } // Create a node from a key and value object pair Node(Text key, Text value) { this(new Text(key.toString()+" "+value.toString())) ; } Public String getId() {return this.id ; } public String getNeighbours() { return this.neighbours ; } public int getDistance() { return this.distance ; } public String getState() { return this.state ; } }
Node
object for its input and then examine it, and based on its state do the appropriate processing.public static class GraphPathMapper extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { Node n = new Node(value) ; if (n.getState().equals("C")) { // Output the node with its state changed to Done context.write(new Text(n.getId()), new Text(n.getNeighbours()+" "+n.getDistance()+" "+"D")) ; for (String neighbour:n.getNeighbours().split(",")) { // Output each neighbour as a Currently processing node // Increment the distance by 1; it is one link further away context.write(new Text(neighbour), new Text(" "+(n.getDistance()+1)+" C")) ; } } else { // Output a pending node unchanged context.write(new Text(n.getId()), new Text(n.getNeighbours()+" "+n.getDistance() +" "+n.getState())) ; } } }
public static class GraphPathReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // Set some default values for the final output String neighbours = null ; int distance = -1 ; String state = "P" ; for(Text t: values) { Node n = new Node(key, t) ; if (n.getState().equals("D")) { // A done node should be the final output; ignore the remaining // values neighbours = n.getNeighbours() ; distance = n.getDistance() ; state = n.getState() ; break ; } // Select the list of neighbours when found if (n.getNeighbours() != null) neighbours = n.getNeighbours() ; // Select the largest distance if (n.getDistance() > distance) distance = n.getDistance() ; // Select the highest remaining state if (n.getState().equals("D") || (n.getState().equals("C") &&state.equals("P"))) state=n.getState() ; } // Output a new node representation from the collected parts context.write(key, new Text(neighbours+" "+distance+" "+state)) ; } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "graph path"); job.setJarByClass(GraphPath.class); job.setMapperClass(GraphPathMapper.class); job.setReducerClass(GraphPathReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
The job here implements the previously described algorithm that we'll execute in the following sections. The job setup is pretty standard, and apart from the algorithm definition the only new thing here is the use of an inner class to represent nodes.
The input to a mapper or reducer is often a flattened representation of a more complex structure or object. We could just use that representation, but in this case this would result in the mapper and reducer bodies being full of text and string manipulation code that would obscure the actual algorithm.
The use of the Node
inner class allows the mapping from the flat file to object representation that is to be encapsulated in an object that makes sense in terms of the business domain. This also makes the mapper and reducer logic clearer as comparisons between object attributes are more semantically meaningful than comparisons with slices of a string identified only by absolute index positions.
3.133.158.36