Using Protocol Buffers to serialize data

Protocol Buffers is a cross-language data format. Protocol Buffers uses an interface definition file to generate bindings in many languages, including Java.

This recipe will demonstrate how to define a Protocol Buffers message, generate the corresponding Java bindings, and use these bindings to serialize a Java object to HDFS using MapReduce.

Getting ready

You will need to download/compile/install the following:

Note

Note that you will need to have a GNU C/C++ compiler collection installed to compile the protocol buffer source. We will be compiling the source code for Protocol Buffers.

To install GNU C/C++ using Yum, run the following command as the root user from a bash shell:

# yum install gcc gcc-c++ autoconf automake

To compile and install Protocol Buffers, type the following lines of code:

$ cd /path/to/protobuf
$ ./configure
$ make
$ make check
# make install
# ldconfig

How to do it...

  1. Set up the directory structure.
    $ mkdir test-protobufs
    $ mkdir test-protobufs/src
    $ mkdir test-protobufs/src/proto
    $ mkdir test-protobufs/src/java
    $ cd test-protobufs/src/proto
  2. Next, create the protocol format.
    package example;
    
    option java_package = "com.packt.hadoop.hdfs.ch2";
    option java_outer_classname = "WeblogRecord";
    
    message Record {
      optional string cookie = 1;
      required string page = 2;
      required int64 timestamp = 3;
      required string ip = 4;
    }

    Save the file as weblog_record.proto in the test-protobufs/src/proto/ folder.

  3. Compile the protocol format from the test-protobufs folder. WeblogRecord.java is generated in src/java/ by protoc:
    $ cd ../../
    $ protoc --proto_path=src/proto/ --java_out=src/java/ src/proto/weblog_record.proto
  4. Now, we will write a MapReduce application to read weblog_entries.txt from HDFS and use Elephant Bird's LzoProtobufBlockOutputFormat class to serialize the WeblogRecord object to an LZO compressed file:
    public class ProtobufMapper extends Mapper<Object, Text, NullWritable, ProtobufWritable<WeblogRecord.Record>> {
    
        private ProtobufWritable<WeblogRecord.Record> protobufRecord = ProtobufWritable.newInstance(WeblogRecord.Record.class);
        private SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss");
        
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("	");
            String cookie = tokens[0];
            String page = tokens[1];
            String date = tokens[2];
            String time = tokens[3];
            String formatedDate = date + ":" + time;
            Date timestamp = null;
            try {
                timestamp = dateFormatter.parse(formatedDate);
            } catch(ParseException ex) {
                return;
            }
            String ip = tokens[4];
            protobufRecord.set(WeblogRecord.Record.newBuilder()
                    .setCookie(cookie)
                    .setPage(page)
                    .setTimestamp(timestamp.getTime())
                    .setIp(ip)
                    .build());
            context.write(NullWritable.get(), protobufRecord);
        }
    }
  5. Finally, we will configure the MapReduce job.
    public class ProtobufWriter extends Configured implements Tool {
        
        public int run(String[] args) throws Exception {
            
            Path inputPath = new Path(args[0]);
            Path outputPath = new Path(args[1]);
            
            Configuration conf = getConf();
            Job weblogJob = new Job(conf);
            weblogJob.setJobName("ProtobufWriter");
            weblogJob.setJarByClass(getClass());
            weblogJob.setNumReduceTasks(0);
            weblogJob.setMapperClass(ProtobufMapper.class);  
            weblogJob.setMapOutputKeyClass(LongWritable.class);
            weblogJob.setMapOutputValueClass(Text.class);
            weblogJob.setOutputKeyClass(LongWritable.class);
            weblogJob.setOutputValueClass(Text.class);
            weblogJob.setInputFormatClass(TextInputFormat.class);
            weblogJob.setOutputFormatClass(
    LzoProtobufBlockOutputFormat.class);
            
            FileInputFormat.setInputPaths(weblogJob, inputPath);
            LzoProtobufBlockOutputFormat.setClassConf(WeblogRecord.Record.class, weblogJob.getConfiguration());
            LzoProtobufBlockOutputFormat.setOutputPath(weblogJob, outputPath);
            
           
            if(weblogJob.waitForCompletion(true)) {
                return 0;
            }
            return 1;
        }
        
        public static void main( String[] args ) throws Exception {
            int returnCode = ToolRunner.run(
    new ProtobufWriter(), args);
            System.exit(returnCode);
        }
    }

How it works...

The first task is to define and compile a Protocol Buffers message definition. This definition file can be used to generate bindings in any language the Protocol Buffers compiler supports. There are a couple of things to note about the format of the message.

First, the package definition package example; is not related to Java packages. It is the namespace of the message defined in the *.proto file. Second, the option java_package declaration is a Java package definition. Finally, the option java_outer_classname declaration is the output class name that will be used. Within java_outer_classname, the Record class will be defined.

Next, we wrote a MapReduce application to serialize the WeblogRecord object generated by the Protocol Buffers compiler. To set up the MapReduce job, we set the input format to read a normal text file.

 weblogJob.setInputFormatClass(TextInputFormat.class);

Then, the output format was set to store the records produced from the job in the Protocol Buffers block format, compressed using LZO.

LzoProtobufBlockOutputFormat.setClassConf(WeblogRecord.Record.class, weblogJob.getConfiguration());
        LzoProtobufBlockOutputFormat.setOutputPath(weblogJob, outputPath);
        

In the mapper, we use the ProtobufWritable class of Elephant Bird to wrap the WeblogRecord.Record object. The ProtobufWritable class is derived from the WritableComparable class of Hadoop, which all keys emitted in MapReduce must implement. Every time we generate any type of binding using protoc, the ProtobufWritable class helps avoid having to write a custom WritableComparable class.

In the mapper, we instantiate a ProtobufWritable instance.

    private ProtobufWritable<WeblogRecord.Record> protobufRecord = ProtobufWritable.newInstance(WeblogRecord.Record.class);

Then, we call the set method of the protobufRecord object with a new instance of WeblogRecord.Record. Finally, the mapper emits the protobufRecord object:

protobufRecord.set(WeblogRecord.Record.newBuilder()
                .setCookie(cookie)
                .setPage(page)
                .setTimestamp(timestamp.getTime())
                .setIp(ip)
                .build());
context.write(NullWritable.get(), protobufRecord);
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.249.198