Using Apache Giraph to perform a distributed breadth-first search

In this recipe, we will use the Apache Giraph API to implement a distributed breadth-first search to determine if two employees are connected in the company's network via one or more pathways. The code will rely on message passing between employee vertices to determine if a vertex is reachable.

Getting ready

Make sure you have a basic familiarity with Google Pregel/BSP and the Giraph API.

You will need access to a pseudo-distributed Hadoop cluster. The code listed in this recipe uses a split master worker configuration that is not ideal in fully-distributed environments. It also assumes familiarity with bash shell scripting.

You will need to load the example dataset gooftech.tsv to an HDFS folder located at /input/gooftech.

You will also need to package this code inside a JAR file to be executed by the Hadoop JAR launcher from the shell. The shell script listed in the recipe will show a template for job submission with the correct classpath dependencies.

How to do it...

Carry out the following steps to perform a breadth-first search in Giraph:

  1. Implement EmployeeRDFTextInputFormat.java. See steps 1 to 3 in the How to do it… section of the Single-source shortest-path with Apache Giraph recipe.
  2. The job setup code, vertex class, and custom output format are all contained in a single class. Save the following code in a package of your choice to a class with the name EmployeeBreadthFirstSearch.java:
    import org.apache.giraph.graph.*;
    import org.apache.giraph.lib.TextVertexOutputFormat;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /**
     * Start with specified employee, mark the target if message is received
     */
    public class EmployeeBreadthFirstSearch implements Tool{
    
        public static final String NAME = "emp_breadth_search";
    
        private Configuration conf;
        private static final String SOURCE_ID = "emp_src_id";
        private static final String DEST_ID = "emp_dest_id";
    
        public EmployeeBreadthFirstSearch(Configuration configuration) {
            conf = configuration;
        }
  3. The run() method in the following code sets up the Giraph job configuration:
        @Override
        public int run(String[] args) throws Exception {
            if(args.length < 5) {
                System.err.println(printUsage());
                System.exit(1);
            }
            if(args.length > 5) {
                System.err.println("too many arguments. " +"Did you forget to quote the source or destination ID name ('firstname lastname')");
                System.exit(1);
            }
            String input = args[0];
            String output = args[1];
            String source_id = args[2];
            String dest_id = args[3];
            String zooQuorum = args[4];
    
            conf.set(SOURCE_ID, source_id);
            conf.set(DEST_ID, dest_id);
            conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, 
                            false);
            conf.setBoolean(GiraphJob.USE_SUPERSTEP_COUNTERS, 
                            false);
            conf.setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
            GiraphJob job = new GiraphJob(conf, "determine connectivity between " + source_id + " and " + dest_id);
            job.setVertexClass(EmployeeSearchVertex.class); 
    job.setVertexInputFormatClass(EmployeeRDFTextInputFormat.cl
                                  ass);
            job.setVertexOutputFormatClass(BreadthFirstTextOutputFormat.class);
            job.setZooKeeperConfiguration(zooQuorum);
    
            FileInputFormat.addInputPath(job.getInternalJob(), 
                                         new Path(input));
            FileOutputFormat.setOutputPath(job.getInternalJob(), 
                                   removeAndSetOutput(output));
    
            job.setWorkerConfiguration(1, 1, 100.0f);   
    
            if(job.run(true)) {
                long srcCounter = job.getInternalJob().getCounters().
                         getGroup("Search").findCounter("Source 
                                 Id found").getValue();
                long dstCounter = 
    job.getInternalJob().getCounters().getGroup("Search").findCounter("Dest Id found").getValue();
                if(srcCounter == 0 || dstCounter == 0) {
                     System.out.println("Source and/or Dest Id not found in dataset. Check your arguments.");
                }
                return 0;
            } else {
                return 1;
            }
        }
  4. The following, method force deletes the supplied output folder in HDFS. Use it with caution. The other methods are required to conform to the Tool interface:
        private Path removeAndSetOutput(String outputDir) throws IOException {
            FileSystem fs = FileSystem.get(conf);
            Path path = new Path(outputDir);
            fs.delete(path, true);
            return path;
        }
    
        private String printUsage() {
            return "usage: <input> <output> <single quoted source_id> <single quoted dest_id> <zookeeper_quorum>";
        }
    
        @Override
        public void setConf(Configuration conf) {
            this.conf = conf;
        }
    
        @Override
        public Configuration getConf() {
            return conf;
        }
  5. The main() method instantiates and submits the job using ToolRunner:
        public static void main(String[] args) throws Exception {
            System.exit(ToolRunner.run(new EmployeeBreadthFirstSearch(new Configuration()), args));
        }
  6. The static inner class, EmployeeSearchVertex, lets us define a custom compute method to be used during each superstep:
        public static class EmployeeSearchVertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends EdgeListVertex<Text, IntWritable, NullWritable, IntWritable> {
    
            private IntWritable msg = new IntWritable(1);
    
            private boolean isSource() {
                return getId().toString().equals(
                        getConf().get(SOURCE_ID));
            }
    
            private boolean isDest() {
                return getId().toString().equals(
                        getConf().get(DEST_ID));
            }
    
            @Override
            public void compute(Iterable<IntWritable> messages) throws IOException {
                if(getSuperstep() == 0) {
                    if(isSource()) {
                        getContext().getCounter("Search", "Source Id found").increment(1);
                        sendMessageToAllEdges(msg);
                    }  else if(isDest()){
                        getContext().getCounter("Search", "Dest Id found").increment(1l);
                    }
                }
                boolean connectedToSourceId = false;
                for(IntWritable msg : messages) {
                    if(isDest()) {
                        setValue(msg);
                    }
                    connectedToSourceId = true;
                }
                if(connectedToSourceId)
                    sendMessageToAllEdges(msg);
                voteToHalt();
            }
        }
  7. The static inner class, BreadthFirstTextOutputFormat, lets us define a custom OutputFormat. The BreadtFirstTextOutputFormat class lets us output our vertex information as Text key-value pairs back to HDFS:
        public static class BreadthFirstTextOutputFormat extends
                TextVertexOutputFormat <Text, IntWritable, NullWritable> {
    
            private static class EmployeeRDFVertexWriter
                    extends TextVertexWriter <Text, IntWritable, NullWritable> {
    
                private Text valOut = new Text();
                private String sourceId = null;
                private String destId = null;
    
                public EmployeeRDFVertexWriter(
                        String sourceId, String destId, RecordWriter<Text, Text> lineRecordWriter) {
                    super(lineRecordWriter);
                    this.sourceId = sourceId;
                    this.destId = destId;
                }
    
                @Override
                public void writeVertex(
                        Vertex<Text, IntWritable, NullWritable, ?> vertex)
                        throws IOException, InterruptedException {
    
                    if(vertex.getId().toString().equals(destId)) {
                        if(vertex.getValue().get() > 0) {
                            getRecordWriter().write(new Text(sourceId + " is connected to " + destId), new Text(""));
                        } else {
                            getRecordWriter().write(new Text(sourceId + " is not connected to " + destId), new Text(""));
                        }
                    }
                }
            }
    
            @Override
            public VertexWriter<Text, IntWritable, NullWritable>
            createVertexWriter(TaskAttemptContext context)
                    throws IOException, InterruptedException {
                RecordWriter<Text, Text> recordWriter =
                        textOutputFormat.getRecordWriter(context);
                String sourceId = context.getConfiguration().get(SOURCE_ID);
                String destId = context.getConfiguration().get(DEST_ID);
                return new EmployeeRDFVertexWriter(sourceId, destId, recordWriter);
            }
        }
    
    }
  8. Create the shell script run_employee_connectivity_search.sh using the commands listed in the following code snippet. Change GIRAPH_PATH to match your local path to the Giraph JAR file and change JAR_PATH to match the local path to your own custom JAR file that you compiled using the preceding code.

    Note

    To use the alias emp_breadth_first, your custom JAR file must use the Hadoop Driver class as its main class in the JAR file.

    GIRAPH_PATH=lib/giraph/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar
    HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$GIRAPH_PATH
    JAR_PATH=dist/employee_examples.jar
    export HADOOP_CLASSPATH
    hadoop jar $JAR_PATH emp_breadth_search -libjars $GIRAPH_PATH,$JAR_PATH /input/gooftech /output/gooftech 'Valery Dorado' 'Gertha Linda' localhost:2181
  9. Run run_employee_connectivity_search.sh. You should see the job submitted to the Hadoop cluster. Upon successful completion, you should see a single part file under /output/gooftech saying Valery Dorado is not connected to Gertha Linda.
  10. Open run_employee_connectivity_search.sh. Change the source ID to Shoshana Gatton. Save and close the script.
  11. Run run_employee_connectivity_search.sh. The output should now be Shoshana Gatton is connected to Gertha Linda.

How it works...

To understand how the custom InputFormat and job setup works, check out the How it works… section from the recipe titled Single-source shortest-path using Apache Giraph. This recipe uses exactly the same input format, and the same job setup, except for the following differences:

  • The job requires an additional DEST_ID argument to be supplied by the command line.
  • The Vertex implementation is EmployeeSearchVertex.
  • The OutputFormat subclass is set to the static inner class BreadthFirstTextOutputFormat. This is explained in more detail in the following paragraph.
  • We use counters during the job execution to determine if the supplied source/destination IDs are found in the dataset.

The compute() function inside EmployeeSearchVertex is where we take advantage of Giraph message passing to determine reachability. Starting at the first superstep, we send a message to each edge from the source ID. If we find the supplied source IDs and destination IDs in the dataset vertices, we increment the counters to let the user know. This will help us quickly see any incorrectly entered command-line arguments for source/destination vertex IDs. After the first superstep, both these counters should be set to 1. We define a private constant member variable msg that is set to 1. The actual numeric content of the message is never used, but by keeping the vertex value as IntWritable we can use the already built custom InputFormat EmployeeRDFTextInputFormat. If during any superstep a vertex receives a message, we forward that message along to each of its edges. If the destination vertex ever receives a message, we set its value to the integer 1 contained in the message. By the end of the job execution, the destination vertex will have a value of 1, which means it is connected by one or more edges to the source vertex, or to the initial value of 0, meaning it never received a message and is not connected.

We define the static inner class BreadthFirstTextOutputFormat to handle the output formatting. This follows a similar inheritance/delegation pattern to our custom InputFormat defined earlier, except instead of delegating to a custom RecordReader, we use a custom RecordWriter. When we instantiate our TextVertexWriter subclass EmployeeRDFVertexWriter, we pass its references to the configured source and destination vertex IDs. The framework handles this automatically by calling the writeVertex() method for each vertex in our dataset. For this job, we are only interested in printing out whether or not the source vertex is connected by one or more paths to the destination vertex. If the current vertex we are processing is the destination vertex, we will printout one of two strings. If the vertex value is greater than 0, then that destination must have received one or more messages, which is only possible if there exists at least one path of edge communication between the source and destination. Otherwise, if the value of the destination vertex is still 0, then we can safely assume that it is not reachable by the source. For just one pair of source-destination nodes, as we have in this recipe, we could have placed this business logic directly in the job class and used counters after the execution finished, but this design is more extensible should we want to use this code to query multiple destination-source vertex pairs.

There's more...

Programs designed using the Hadoop MapReduce API usually require some additional tuning once you begin testing at scale. It is not uncommon to completely re-evaluate a chosen design pattern that simply does not scale. Working with the Giraph API requires the same diligence and patience.

Apache Giraph jobs often require scalability tuning

This is not always easy to spot initially. You may have a relatively small graph that operates very well within a given BSP design approach. Suddenly you hit scale and notice all sorts of errors you never planned for. Try to keep your compute() function small to avoid complications and aid with troubleshooting. At the time of this writing, Giraph workers will attempt to hold their assigned graph partitions directly in memory. Minimizing vertex memory footprint is of the upmost importance. Moreover, many people have to tune their message passing settings using the parameters located at the top of GiraphJob. You can control the number of messaging threads used by each worker to communicate with other workers by setting MSG_NUM_FLUSH_THREADS. By default, Giraph will let each worker open a communication thread to every other worker in the job. For many Hadoop clusters, this is not sustainable. Also, consider adjusting the maximum number of messages allowed to be flushed in bulk using MAX_MESSAGES_PER_FLUSH_PUT. The default value 2000 may not be adequate for your job.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.188.121