Writing to HDFS as parquet files

Since we want to write Parquet files into HDFS, we would be using the same BucketSink as used before but with a custom Parquet Writer and a DateTimeBucketer with a minute-based partition, as shown as follows. The Bucket sink path is passed as a command line argument, hdfsPath, which we will discuss later. Update the HADOOP_USER_NAME system property in the code to your user account name in CentOS:

System.setProperty("HADOOP_USER_NAME", "centos");

//HDFS Sink
BucketingSink<Tuple2<IntWritable, Text>> hdfsSink = new BucketingSink<Tuple2<IntWritable, Text>>(parameterTool.getRequired("hdfsPath"));
hdfsSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HH-mm"));
hdfsSink.setWriter(new SinkParquetWriter<Tuple2<IntWritable, Text>>("address.avsc"));
hdfsSink.setBatchSize(1024 * 1024 * 1);
messageStream.addSink(hdfsSink);
Code 06: HDFS Bucket Sink

The custom Parquet writer writes a specific data object as an Avro Record to HDFS. It is then converted to Parquet format using the AvroParquet class with SNAPPY compression enabled as shown here for the address data object:

private static class SinkParquetWriter<T> implements Writer<T> {

transient ParquetWriter writer = null;
String schema = null;
transient Schema schemaInstance = null;
final ObjectMapper MAPPER = new ObjectMapper();

public SinkParquetWriter(String schema) {
this.writer = writer;
this.schema = schema;
try {
this.schemaInstance = new Schema.Parser().parse(getClass().getClassLoader()
.getResourceAsStream(schema));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public void open(FileSystem fileSystem, Path path) throws IOException {
writer = AvroParquetWriter.builder(path)
.withSchema(this.schemaInstance)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
}

public long flush() throws IOException {
return writer.getDataSize();
}

public long getPos() throws IOException {
return writer.getDataSize();
}

public void close() throws IOException {
writer.close();
}


public void write(T t) throws IOException {
final Tuple2<IntWritable, Text> tuple = (Tuple2<IntWritable, Text>) t;
final List values = new ArrayList();
GenericRecord record = new GenericData.Record(schemaInstance);
String inputRecord=tuple.f1.toString();
Address address = MAPPER.readValue(inputRecord,
Address.class);
record.put("id", String.valueOf(address.getId()));
record.put("customerId", address.getCustomerId());
record.put("street1", address.getStreet1());
record.put("street2", address.getStreet2());
record.put("city", address.getCity());
record.put("state", address.getState());
record.put("country", address.getCountry());
record.put("zipCode", address.getZipCode());

writer.write(record);
}

public Writer<T> duplicate() {
return new SinkParquetWriter<T>(schema);
}
}
Code 07: Custom Parquet Writer
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.72.245