Using Apache Thrift to serialize data

Apache Thrift is a cross-language serialization and RPC services framework. Thrift uses an interface definition file to generate bindings in many languages, including Java.

This recipe demonstrates the defining of a Thrift interface, the generation of the corresponding Java bindings, and the use of these bindings to serialize a Java object to HDFS using MapReduce.

Getting ready

You will need to download/compile/install the following:

To compile and install Apache Thrift, first ensure that you have all the required dependencies using Yum:

# yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel openssl-devel

Next, build Elephant Bird.

$ cd /path/to/elephant-bird
$ ant

Copy the elephant-bird-X.X.X.jar file to the classpath of your development environment.

How to do it...

  1. Set up the directory structure.
    $ mkdir test-thrift
    $ mkdir test-thrift/src
    $ mkdir test-thrift/src/thrift
    $ mkdir test-thrift/src/java
    $ cd test-thrift/src/thrift
  2. Next, create an interface definition:
    namespace java com.packt.hadoop.hdfs.ch2.thrift
    
    struct WeblogRecord {
      1: optional string cookie,
      2: string page,
      3: i64 timestamp,
      4: string ip
    }

    Save the file as weblog_record.thrift in the test-thrift/src/thrift/ folder.

  3. Compile and generate the .java file:
    # thrift --gen java -o src/java/ src/thrift/weblog_record.thrift

    Thrift should have generated a file named WeblogRecord.java in the src/java/ folder.

  4. Now, we will write a MapReduce application to read weblog_entries.txt from HDFS and use Elephant-Bird's LzoThriftBlockOutputFormat class to serialize the WeblogRecord object to an LZO compressed file
    public class ThriftMapper extends Mapper<Object, Text, NullWritable, ThriftWritable<WeblogRecord>> {
    
        private ThriftWritable<WeblogRecord> thriftRecord = ThriftWritable.newInstance(WeblogRecord.class);
        private WeblogRecord record = new WeblogRecord();
        private SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss");
        
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("	");
            String cookie = tokens[0];
            String page = tokens[1];
            String date = tokens[2];
            String time = tokens[3];
            String formatedDate = date + ":" + time;
            Date timestamp = null;
            try {
                timestamp = dateFormatter.parse(formatedDate);
            } catch(ParseException ex) {
                return;
            }
            String ip = tokens[4];
            record.setCookie(cookie);
            record.setPage(page);
            record.setTimestamp(timestamp.getTime());
            record.setIp(ip);
            thriftRecord.set(record);
            context.write(NullWritable.get(), thriftRecord);
        }
    }
  5. Finally, we will configure the MapReduce job.
    public class ThriftWriter extends Configured implements Tool {
        
        public int run(String[] args) throws Exception {
                    
            Path inputPath = new Path(args[0]);
            Path outputPath = new Path(args[1]);
            
            Configuration conf = getConf();
            Job weblogJob = new Job(conf);
            weblogJob.setJobName("ThriftWriter");
            weblogJob.setJarByClass(getClass());
            weblogJob.setNumReduceTasks(0);
            weblogJob.setMapperClass(ThriftMapper.class);        
            weblogJob.setMapOutputKeyClass(LongWritable.class);
            weblogJob.setMapOutputValueClass(Text.class);
            weblogJob.setOutputKeyClass(LongWritable.class);
            weblogJob.setOutputValueClass(Text.class);
            weblogJob.setInputFormatClass(TextInputFormat.class);
            weblogJob.setOutputFormatClass(
    LzoThriftBlockOutputFormat.class);
            
            FileInputFormat.setInputPaths(weblogJob, inputPath);
            LzoThriftBlockOutputFormat.setClassConf(
    WeblogRecord.class, weblogJob.getConfiguration());
            LzoThriftBlockOutputFormat.setOutputPath(weblogJob, outputPath);
                   
            if (weblogJob.waitForCompletion(true)) {
                return 0;
            }
            return 1;
        }
        
        public static void main( String[] args ) throws Exception {
            int returnCode = ToolRunner.run(
    new ThriftWriter(), args);
            System.exit(returnCode);
        }
    }

How it works...

The first task required us to define and compile a Thrift interface definition. This definition file can be used to generate bindings in any language that Thrift supports.

Next, we used Elephant Bird to build a MapReduce application to serialize the WeblogRecord object that Thrift generated. To set up the MapReduce job, we set the input format to read a normal text file:

 weblogJob.setInputFormatClass(TextInputFormat.class);

Then the output format was configured to use Thrift block format compression with LZO to store the output records.

LzoThriftBlockOutputFormat.setClassConf(
WeblogRecord.class, weblogJob.getConfiguration());
        LzoThriftBlockOutputFormat.setOutputPath(weblogJob, outputPath);

In the mapper, we use the ThriftWritable class of Elephant Bird to wrap the WeblogRecord object. The ThriftWritable class is derived from the WritableComparable class of Hadoop, which must be implemented by all the keys emitted in MapReduce. Every time we generate any type of binding using Thrift, the ThriftWritable class helps avoid having to write a custom WritableComparable class.

In the mapper, we instantiate both ThriftWritable and WeblogRecord instances:

    private ThriftWritable<WeblogRecord> thriftRecord = 
      ThriftWritable.newInstance(WeblogRecord.class);
    private WeblogRecord record = new WeblogRecord();

Then, we call the set method of the thriftRecord object with an instance of WeblogRecord. Finally, the mapper emits the thriftRecord object, which contains an instance of WeblogRecord.

thriftRecord.set(record);
context.write(NullWritable.get(), thriftRecord);

See also

The following recipe will demonstrate another popular serialization framework developed by Google:

  • Using Protocol Buffers to serialize Data
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.116.60.62