Creating a dataset using StructType

In the previous section, we discussed the creation of a dataset from RDD with the help of encoders. Similarly in Chapter 5, Working with Data and Storage, we also discussed a couple of different ways in which we can create dataframes , an alias for dataset (row), from JSON, CSV, and XML files. There is another way in which we can dynamically create a schema and hence form an object of dataset (row). The first step involves converting RDD(T) to RDD(row) using the factory pattern method of class RowFactory. Then we identify the number of fields in the data and create a schema of StructType representing the structure of RDD(row) and finally apply a schema to the createDataFrame method in SparkSession.

As an example, let's assume we have a text file whose first row represents the name of the column and the rest of the rows represents data:

//Create a RDD
JavaRDD<String> deptRDD = sparkSession.sparkContext()
.textFile("src/main/resources/dept.txt", 1)
.toJavaRDD();

//Convert the RDD to RDD<Rows>
JavaRDD<Row> deptRows = deptRDD.filter(str-> !str.contains("deptno")).map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String rowString) throws Exception {
String[] cols = rowString.split(",");
return RowFactory.create(cols[0].trim(), cols[1].trim(),cols[2].trim());
}
});

//Create schema
String[] schemaArr=deptRDD.first().split(",");
List<StructField> structFieldList = new ArrayList<>();
for (String fieldName : schemaArr) {
StructField structField = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
structFieldList.add(structField);
}
StructType schema = DataTypes.createStructType(structFieldList);

Dataset<Row> deptDf = sparkSession.createDataFrame(deptRows, schema);
deptDf.printSchema();
deptDf.show();
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.216.175