Using Apache Avro to serialize data

The description from the Apache Avro site defines Avro as a "data serialization system". Apache Avro supports a language-independent file format and includes serialization and RPC mechanisms. One of the neat features of Avro is that you do not need to compile any type of interface or protocol definition files in order to use the serialization features of the framework.

In this recipe, we will use Avro to serialize and write Java objects to a file in HDFS using MapReduce.

Getting ready

You will need to download/compile/install the following:

How to do it...

  1. The following is a Java class that represents a row from the weblog_entries.txt dataset:
    public class WeblogRecord {
        private String cookie;
        private String page;
        private Date date;
        private String ip;
        
        public WeblogRecord() {
            
        }
        public WeblogRecord(String cookie, String page, Date date, String ip) {
            this.cookie = cookie;
            this.page = page;
            this.date = date;
            this.ip = ip;
        }
       //getters and setters 
    
        
        @Override
        public String toString() {
            return cookie + "	" + page + "	" + date.toString() + "	" + ip;
        }
        
    }
  2. This will be a map-only job, like the job that was created to generate and read SequenceFiles. However, instead of using IdentityMapper, we will write a mapper that reads a row from weblog_entries.txt and creates an instance of WeblogRecord.
    public class WeblogMapper extends MapReduceBase implements Mapper<LongWritable, Text, AvroWrapper, NullWritable> {
        
        private AvroWrapper<WeblogRecord> outputRecord = new AvroWrapper<WeblogRecord>();
    
        private WeblogRecord weblogRecord = new WeblogRecord();
    
        SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss");
    
        public void map(LongWritable key, Text value, OutputCollector<AvroWrapper, NullWritable> oc, Reporter rprtr) throws IOException {
    
            String[] tokens = value.toString().split("	");
            String cookie = tokens[0];
            String page = tokens[1];
            String date = tokens[2];
            String time = tokens[3];
            String formattedDate = date + ":" + time;
            Date timestamp = null;
            try {
                timestamp = dateFormatter.parse(formattedDate);
            } catch(ParseException ex) {
                // ignore records with invalid dates
                return;
            }
            String ip = tokens[4];
            
            weblogRecord.setCookie(cookie);
            weblogRecord.setDate(timestamp);
            weblogRecord.setIp(ip);
            weblogRecord.setPage(page);
            outputRecord.datum(weblogRecord);
            oc.collect(outputRecord, NullWritable.get());
        }
        
    }
  3. Now, use the MapReduce job to read a text file, and then serialize and persist the WeblogRecord object:
    public class AvroWriter extends Configured implements Tool {
    
        public int run(String[] args) throws Exception {
    
            Path inputPath = new Path(args[0]);
            Path outputPath = new Path(args[1]);
    
            Schema schema = ReflectData.get().getSchema(WeblogRecord.class);
    
            Configuration conf = getConf();
            JobConf weblogJob = new JobConf(conf, getClass());
            weblogJob.setJobName("Avro Writer");
            weblogJob.setNumReduceTasks(0);
            weblogJob.setMapperClass(WeblogMapper.class);
            weblogJob.setMapOutputKeyClass(AvroWrapper.class);
            weblogJob.setMapOutputValueClass(NullWritable.class);
            weblogJob.setInputFormat(TextInputFormat.class);
            AvroJob.setOutputSchema(weblogJob, schema);
            FileInputFormat.setInputPaths(weblogJob, inputPath);
            FileOutputFormat.setOutputPath(weblogJob, outputPath);
            
            RunningJob job = JobClient.runJob(weblogJob);
            if(job.isSuccessful()) {
                return 0;
            }
            return 1;
        }
    
        public static void main(String[] args) throws Exception {
            int returnCode = ToolRunner.run(new AvroWriter(), args);
            System.exit(returnCode);
        }
    }

How it works...

The AvroWriter MapReduce job reads a plain text file and serializes the WeblogRecord class into an Avro file. The first step is to set up a MapReduce job to read the text file and write the output file using the Avro file format.

Set the input format to read a text file:

weblogJob.setInputFormat(TextInputFormat.class);

Build an Avro schema based on the WeblogRecord class, and then set the output schema:

Schema schema = ReflectData.get().getSchema(WeblogRecord.class);
AvroJob.setOutputSchema(weblogJob, schema);

Next, we use the old Hadoop MapReduce API to write the mapper and emit the WeblogRecord object by using the AvroWrapper class.

Members emitted of the WeblogMapper class are:

private AvroWrapper<WeblogRecord> outputRecord = new AvroWrapper<WeblogRecord>();
private WeblogRecord weblogRecord = new WeblogRecord();

Data emitted from the WeblogMapper map() method are:

outputRecord.datum(weblogRecord);
oc.collect(outputRecord, NullWritable.get());

The output of this map-only job is stored in the Avro file format.

There's more...

To read the Avro file format produced by the AvroWriter job, we just need to change the input format and the mapper class. First, set the input format and the input schema.

JobConf weblogJob = new JobConf(conf, getClass());
Schema schema = ReflectData.get().getSchema(WeblogRecord.class);
AvroJob.setReflect(weblogJob);

Next, create a mapper class with the following definition:

public class WeblogMapperAvro extends MapReduceBase
            implements Mapper<AvroWrapper<WeblogRecord>, NullWritable, Text, NullWritable>
{
 public void map(AvroWrapper<WeblogRecord> key, NullWritable value, OutputCollector<Text, NullWritable> oc, Reporter rprtr) throws IOException {
       WeblogRecord weblogRecord = key.datum();
      //process the web log record
    }
}

See also

The following recipes will demonstrate additional data serialization libraries that can be used with Hadoop:

  • Using Apache Thrift to serialize data
  • Using Protocol Buffers to serialize data
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.148.104.124