Single-source shortest-path with Apache Giraph

In this recipe, we will implement a variant of the Google Pregel shortest-path implementation between employees connected via an acyclic directed graph. The code will take a single source ID, and for all vertices in the graph, will mark the minimum number of hops required to reach each vertex from the source ID vertex. The employee network is stored in HDFS as a line-separated list of RDF triples. Resource Description Framework (RDF) is a very effective data format for representing entities and the relationships between them.

Getting ready

Make sure you have a basic familiarity with Google Pregel/BSP and the Giraph API.

You will need access to a pseudo-distributed Hadoop cluster. The code listed in this recipe uses a non-split master-worker configuration that is not ideal in fully-distributed environments. It also assumes that you have familiarity with bash shell scripting.

You will need to load the example dataset gooftech.tsv to an HDFS folder located at /input/gooftech.

You will also need to package this code inside a JAR file to be executed by the Hadoop JAR launcher from the shell. The shell script listed in the recipe shows a template for job submission with the correct classpath dependencies.

How to do it...

Carry out the following steps to implement the shortest path in Giraph:

  1. First, we define our custom InputFormat that extends TextInputFormat to read the employee RDF triples from the text. Save the class as EmployeeRDFTextInputFormat.java in a package of your choice:
    import com.google.common.collect.Maps;
    import org.apache.giraph.graph.BspUtils;
    import org.apache.giraph.graph.Vertex;
    import org.apache.giraph.graph.VertexReader;
    import org.apache.giraph.lib.TextVertexInputFormat;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.regex.Pattern;
    
    
    public class EmployeeRDFTextInputFormat extends
                 TextVertexInputFormat<Text, IntWritable, NullWritable, IntWritable> {
    
      @Override
      public VertexReader<Text, IntWritable, NullWritable, 
                          IntWritable>
      createVertexReader(InputSplit split, TaskAttemptContext context)
        throws IOException {
        return new EmployeeRDFVertexReader(
            textInputFormat.createRecordReader(split, context));
      }
  2. We write the custom vertex reader used in the input format as a static inner class:
      public static class EmployeeRDFVertexReader extends
          TextVertexInputFormat.TextVertexReader<Text, 
    IntWritable, NullWritable, IntWritable> {
    
        private static final Pattern TAB = Pattern.compile("[\t]");
        private static final Pattern COLON = Pattern.compile("[:]");
        private static final Pattern COMMA = Pattern.compile("[,]");
    
    
        public EmployeeRDFVertexReader(RecordReader<LongWritable, Text> lineReader) {
          super(lineReader);
        }
  3. Override the getCurrentVertex() method. This method is where we use the line reader to parse our custom vertex objects:
        @Override
        public Vertex<Text, IntWritable, NullWritable, IntWritable>
        getCurrentVertex() throws IOException, InterruptedException {
          Vertex<Text, IntWritable, NullWritable, IntWritable>
          vertex = BspUtils.<Text, IntWritable, NullWritable,
          IntWritable>
          createVertex(getContext().getConfiguration());
    
          String[] tokens = TAB.split(getRecordReader()
              .getCurrentValue().toString());
          Text vertexId = new Text(tokens[0]);
    
          IntWritable value = new IntWritable(0);
          String subtoken = COLON.split(tokens[2])[1];
          String[] subs = COMMA.split(subtoken);
          Map<Text, NullWritable> edges =
              Maps.newHashMapWithExpectedSize(subs.length);
          for(String sub : subs) {
             if(!sub.equals("none"))
                edges.put(new Text(sub), NullWritable.get());
          }
    
          vertex.initialize(vertexId, value, edges, null);
    
          return vertex;
        }
    
        @Override
        public boolean nextVertex() throws IOException, InterruptedException {
          return getRecordReader().nextKeyValue();
        }
      }
    }
  4. The job setup code, vertex class, and custom output format are all contained in a single class. Save the following code in a package of your choice to a class named EmployeeShortestPath.java:
    import org.apache.giraph.graph.*;
    import org.apache.giraph.lib.TextVertexOutputFormat;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /**
     * Value based on number of hops. vertices receiving incoming messages increment the message
     */
    public class EmployeeShortestPath implements Tool{
    
        public static final String NAME = "emp_shortest_path";
    
        private Configuration conf;
        private static final String SOURCE_ID = "emp_source_id";
    
        public EmployeeShortestPath(Configuration configuration) {
            conf = configuration;
        }
  5. The run()method in the following code snippet sets up the Giraph job configuration:
        @Override
        public int run(String[] args) throws Exception {
            if(args.length < 4) {
                System.err.println(printUsage());
                System.exit(1);
            }
            if(args.length > 4) {
                System.err.println("too many arguments. " +
                        "Did you forget to quote the source ID 
    name ('firstname lastname')");
                System.exit(1);
            }
            String input = args[0];
            String output = args[1];
            String source_id = args[2];
            String zooQuorum = args[3];
    
            conf.set(SOURCE_ID, source_id);
            conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, 
                             false);
            conf.setBoolean(GiraphJob.USE_SUPERSTEP_COUNTERS, 
                             false);
            conf.setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
            GiraphJob job = new GiraphJob(conf, "single-source 
              shortest path for employee: " + source_id);
            job.setVertexClass(EmployeeShortestPathVertex.class);
            job.setVertexInputFormatClass(EmployeeRDFTextInputFormat.class);
         job.setVertexOutputFormatClass(EmployeeShortestPathOutputFormat.class);
            job.setZooKeeperConfiguration(zooQuorum);
    
            FileInputFormat.addInputPath(job.getInternalJob(), new Path(input));
            FileOutputFormat.setOutputPath(job.getInternalJob(), removeAndSetOutput(output));
    
            job.setWorkerConfiguration(1, 1, 100.0f);   
            return job.run(true) ? 0 : 1;
        }
  6. The following, method force deletes the supplied output folder in HDFS. Use it with caution. The other methods are required to conform to the Tool interface:
        private Path removeAndSetOutput(String outputDir) throws IOException {
            FileSystem fs = FileSystem.get(conf);
            Path path = new Path(outputDir);
            fs.delete(path, true);
            return path;
        }
    
        private String printUsage() {
            return "usage: <input> <output> <single quoted source_id> <zookeeper_quorum>";
        }
    
        @Override
        public void setConf(Configuration conf) {
            this.conf = conf;
        }
    
        @Override
        public Configuration getConf() {
            return conf;
        }
  7. The main() method instantiates and submits the job using ToolRunner:
        public static void main(String[] args) throws Exception {
            System.exit(ToolRunner.run(new EmployeeShortestPath(new Configuration()), args));
        }
  8. The static inner class EmployeeShortestPathVertex lets us define a custom compute method to be used during each superstep:
        public static class EmployeeShortestPathVertex<I extends WritableComparable,
                V extends Writable, E extends Writable, M extends Writable> extends EdgeListVertex <Text, IntWritable, NullWritable, IntWritable>
        {
    
            private IntWritable max = new IntWritable(Integer.MAX_VALUE);
            private IntWritable msg = new IntWritable(1);
    
            private boolean isSource() {
                return getId().toString().equals(
                        getConf().get(SOURCE_ID));
            }
    
            @Override
            public void compute(Iterable<IntWritable> messages) 
                               throws IOException {
                if(getSuperstep() == 0) {
                    setValue(max);
                    if(isSource()) {
                        for(Edge<Text, NullWritable> e : 
                            getEdges()) {
                            sendMessage(e.getTargetVertexId(), 
                                        msg);
                        }
                    }
                }
                int min = getValue().get();
                for(IntWritable msg : messages) {
                    min = Math.min(msg.get(), min);
                }
                if(min < getValue().get()) {
                    setValue(new IntWritable(min));
                    msg.set(min + 1);
                    sendMessageToAllEdges(msg);
                }
                voteToHalt();
            }
        }
  9. The static inner class EmployeeShortestPathOutputFormat lets us define a custom OutputFormat. The class EmployeeRDFVertexWriter lets us output our vertex information as Text key-value pairs back to HDFS:
        public static class EmployeeShortestPathOutputFormat extends TextVertexOutputFormat <Text, IntWritable, NullWritable> {
    
            private static class EmployeeRDFVertexWriter
                    extends TextVertexWriter <Text, IntWritable, NullWritable> {
    
                private Text valOut = new Text();
    
                public EmployeeRDFVertexWriter(
                        RecordWriter<Text, Text> lineRecordWriter) {
                    super(lineRecordWriter);
                }
    
                @Override
                public void writeVertex(
                        Vertex<Text, IntWritable, NullWritable, 
                               ?> vertex)
                        throws IOException, 
                                         InterruptedException {
    
                    valOut.set(vertex.getValue().toString());
                    if(vertex.getValue().get() == 
                                           Integer.MAX_VALUE)
                        valOut.set("no path");
                    getRecordWriter().write(vertex.getId(), 
                                            valOut);
                }
    
    
            }
    
            @Override
            public VertexWriter<Text, IntWritable, NullWritable>
            createVertexWriter(TaskAttemptContext context)
                    throws IOException, InterruptedException {
                RecordWriter<Text, Text> recordWriter =
                        textOutputFormat.getRecordWriter(context);
                return new EmployeeRDFVertexWriter(recordWriter);
            }
        }
    }
  10. Create the shell script run_employee_shortest_path.sh using the commands listed in the following code snippet. Change GIRAPH_PATH to match your local path to the Giraph JAR file and change JAR_PATH to match your local path to the custom JAR file that you compiled the previous code in.

    Note

    To use the alias emp_shortest_path your custom JAR file must use the Hadoop Driver class for its main class.

    GIRAPH_PATH=lib/giraph/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar
    HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$GIRAPH_PATH
    JAR_PATH=dist/employee_examples.jar
    export HADOOP_CLASSPATH
    hadoop jar $JAR_PATH emp_shortest_path -libjars $GIRAPH_PATH,$JAR_PATH /input/gooftech /output/gooftech 'Shanae Dailey' localhost:2181
  11. Run run_employee_shortest_path.sh. The job should be submitted to the Hadoop cluster. Under /output/gooftech should be a single part file that lists minimum number of hops required to reach each employee from source ID, or no path if the employee is not reachable.

How it works...

We start with the custom input format. The Giraph API offers TextVertexInputFormat that wraps TextInputFormat and LineReader to read vertices stored one per line in a text file. Currently, the Giraph API requires your records to be sorted in order of the vertex ID. Our employee dataset is sorted by firstname/lastname, so we satisfy this requirement and can move forward. In order to create meaningful vertices from our RDF data, it is necessary that we subclass TextVertexInputFormat to create EmployeeRDFTextInputFormat. In order to control exactly how our vertices appear, we subclass TextVertexReader and create EmployeeRDFVertexReader. This allows us to override the getRecordReader() method in our custom input format to return an instance of our own reader subclass. The record reader delegates for an instance of the Hadoop LineReader and is responsible for creating the vertices from the text lines seen in each input split. From here we can override getCurrentVertex() and create individual vertices from each incoming RDF triple seen by the line reader. By extending TextVertexReader we don't have to worry about manually controlling the invocation of getCurrentVertex() for each line. The framework handles this for us. We simply need to tell the framework how to turn each line of text into a vertex with one or more edges.

The generic type parameters declared in the definition of EmployeeRDFTextInputFormat are repeatedly seen in the code. From left to right, they provide the concrete type information for the vertex ID class, vertex value class, edge value class, and message class. A quick look at the parent class shows the following generic header:

public abstract class TextVertexInputFormat<I extends WritableComparable,
    V extends Writable, E extends Writable, M extends Writable>
    extends VertexInputFormat<I, V, E, M> 

All four of the generic types must be Writable. The vertex ID class must be WritableComparable. Currently, Giraph does not support other serialization frameworks.

Our getCurrentVertex() method implementation is very basic. We set up several static final regex patterns to split the RDF triples properly. The combination firstname/lastname becomes our vertex ID stored as a Text instance. Each vertex is initialized with a vertex value of 0 stored as an IntWritable. Each subordinate listed in the comma-delimited list is referenced as an edge ID; however, we don't need any direct value information for each edge, and thus NullWritable will suffice for the edge value. For this particular job, our message types will be IntWritable. This class is reused in the next recipe in this chapter titled Using Apache Giraph to perform a distributed breadth-first search. For the sake of brevity, this input format is only explained once here.

Next, we set up our job class. The job setup borrows heavily from the Hadoop MapReduce Java API. We implement the Tool interface and define four arguments to read from the command line. This job requires an input folder from HDFS, and an output folder to write back to HDFS, a source ID to perform single source shortest-path, and a ZooKeeper quorum to manage the job state. Then we need to define a few other parameters as we are testing against a pseudo-distributed cluster with limited resources.

conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
conf.setBoolean(GiraphJob.USE_SUPERSTEP_COUNTERS, false);
conf.setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);

SPLIT_MASTER_WORKER tells Giraph whether or not the master process runs on a different host to the workers. By default, this is set to true, but since we are on a pseudo-distributed single node setup, this needs to be false. Turning off superstep counters will limit the verbosity of the MapReduce WebUI for our job. This is can be handy when testing jobs involving hundreds or potentially thousands of supersteps. Lastly, we turn off checkpointing to tell Giraph that we do not care about backing up the graph state at any superstep. This works because we are only testing and are interested in rapid job execution time. In a production job, it is recommended to checkpoint your graph state regularly at the cost of a slower overall job runtime. We then instantiate an instance of GiraphJob and pass our configuration instance to it along with a somewhat descriptive title for the job.

The next three lines of code are critical for the Giraph job to execute properly on your cluster.

job.setVertexClass(EmployeeShortestPathVertex.class);
job.setVertexInputFormatClass(EmployeeRDFTextInputFormat.class);
job.setVertexOutputFormatClass(EmployeeShortestPathOutputFormat.class);

The first line tells Giraph about our custom Vertex implementation that encapsulates each vertex in the graph. This houses the application-specific compute() function called at each superstep. We extend the base class EdgeListVertex to leverage some pre-existing code for message handling, edge iteration, and member serialization.

Then, we set the ZooKeeper quorum and define a single worker to hold the graph partition. If your pseudo-distributed cluster can support multiple workers (multiple concurrent map JVMs), then feel free to increase this limit. Just remember to leave one free map slot for the master process. Finally, we are ready to submit the job to the cluster.

After the InputFormat handles creating the vertices from the different input splits, each vertex's compute() function gets invoked. We define the static inner class EmployeeShortestPathVertex to override the compute() function and implement the business logic necessary to calculate the shortest path. Specifically, we are interested in the minimum number of hops required to navigate from the source vertex to every other vertex connected by one or more pathways in the graph, or no path if the target vertex is not reachable by the source.

First superstep (S0)

At S0, the function immediately enters the first conditional statement and initializes every vertex value to the maximum possible integer value. As incoming messages are received, each vertex compares the integer contained in each message against the currently held minimum to see if it represents a lower value, therefore the business logic is made a bit easier by setting the initial minimum value to the maximum possible for the datatype. During the first superstep, it is critical that the source vertex sends a message to its edges telling the vertex along that the edge is one hop away from the source. To do this, we define a member instance msg just for messaging. It is reset and reused every time the vertex needs to send a message, and helps to avoid unnecessary instantiation.

We need to compare any incoming messages with the currently held minimum hops value to see if we need to update and notify our edges. Since we are only at S0 there are no messages, so the value remains as Integer.MAX. Since the minimum value does not change, we avoid the last conditional branch.

At the end of each superstep for our job, always invoke voteToHalt(). The Giraph framework will automatically have reactive vertices that have incoming messages at the next superstep, but we want to render vertices inactive that are temporarily done sending/receiving messages. Once there are no more messages to process by any vertex in the graph, the job will stop reactivating vertices and will consider itself finished.

Second superstep (S1)

After the previous superstep, every single vertex in the graph voted to halt the execution. The only vertex that messaged its edges was the source vertex, therefore the framework will reactivate only the vertices connected by the source edges. The source vertex told each edge that they were one hop away, which is less than Integer.MAX and immediately takes the place as the current vertex value. Each vertex receiving the message turns around and notifies its edges that they are min + 1 hops away from the source, and the cycle continues.

Should any connected edge receive a message lower than its current vertex value, that indicates there is a path from the source ID to the current vertex that involves fewer hops, and we need to re-notify each edge connected to the current vertex.

Eventually, every vertex will know its minimum distance and no more messages will be sent at the current superstep N. When starting superstep N + 1, there will be no vertices that need to be reactivated to process incoming messages, and the overall job will finish. Now we need to output each vertex's current value denoting the minimum number of hops from the source vertex.

To write the vertex value information back to HDFS as text, we implement a static inner subclass of TextVertexOutputFormat named EmployeeShortestPathOutputFormat. This follows a similar inheritance/delegation pattern as our custom InputFormat defined earlier, except instead of delegating to a custom RecordReader, we use a custom RecordWriter. We set a Text member variable valOut to reuse while outputting the integer values as strings. The framework automatically handles invoking writeVertex() for each vertex contained in our dataset.

If the current vertex value is still equal to Integer.MAX, we know that the graph never received any incoming messages intended for that vertex, which implies it is not traversable by the source vertex. Otherwise, we output the minimum number of hops required to traverse to the current vertex ID from the source ID.

See also

  • Using Apache Giraph to perform a distributed breadth-first search
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.117.75