Since we want to write Parquet files into HDFS, we would be using the same BucketSink as used before but with a custom Parquet Writer and a DateTimeBucketer with a minute-based partition, as shown as follows. The Bucket sink path is passed as a command line argument, hdfsPath, which we will discuss later. Update the HADOOP_USER_NAME system property in the code to your user account name in CentOS:
System.setProperty("HADOOP_USER_NAME", "centos");
//HDFS Sink
BucketingSink<Tuple2<IntWritable, Text>> hdfsSink = new BucketingSink<Tuple2<IntWritable, Text>>(parameterTool.getRequired("hdfsPath"));
hdfsSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HH-mm"));
hdfsSink.setWriter(new SinkParquetWriter<Tuple2<IntWritable, Text>>("address.avsc"));
hdfsSink.setBatchSize(1024 * 1024 * 1);
messageStream.addSink(hdfsSink);
Code 06: HDFS Bucket Sink
The custom Parquet writer writes a specific data object as an Avro Record to HDFS. It is then converted to Parquet format using the AvroParquet class with SNAPPY compression enabled as shown here for the address data object:
private static class SinkParquetWriter<T> implements Writer<T> {
transient ParquetWriter writer = null;
String schema = null;
transient Schema schemaInstance = null;
final ObjectMapper MAPPER = new ObjectMapper();
public SinkParquetWriter(String schema) {
this.writer = writer;
this.schema = schema;
try {
this.schemaInstance = new Schema.Parser().parse(getClass().getClassLoader()
.getResourceAsStream(schema));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void open(FileSystem fileSystem, Path path) throws IOException {
writer = AvroParquetWriter.builder(path)
.withSchema(this.schemaInstance)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
}
public long flush() throws IOException {
return writer.getDataSize();
}
public long getPos() throws IOException {
return writer.getDataSize();
}
public void close() throws IOException {
writer.close();
}
public void write(T t) throws IOException {
final Tuple2<IntWritable, Text> tuple = (Tuple2<IntWritable, Text>) t;
final List values = new ArrayList();
GenericRecord record = new GenericData.Record(schemaInstance);
String inputRecord=tuple.f1.toString();
Address address = MAPPER.readValue(inputRecord,
Address.class);
record.put("id", String.valueOf(address.getId()));
record.put("customerId", address.getCustomerId());
record.put("street1", address.getStreet1());
record.put("street2", address.getStreet2());
record.put("city", address.getCity());
record.put("state", address.getState());
record.put("country", address.getCountry());
record.put("zipCode", address.getZipCode());
writer.write(record);
}
public Writer<T> duplicate() {
return new SinkParquetWriter<T>(schema);
}
}
Code 07: Custom Parquet Writer