Working with CSV data

CSV files can be treated as plain text files, having a comma as a delimiter and will generally work as expected. Consider, we have data of movies in the following format:

movieId, title, genre

Let's load this file as an RDD of the Movie object, as follows:

Movie POJO:

public class Movie implements Serializable {
private Integer movieId;
private String title;
private String genre;
public Movie() 
{}; public Movie(Integer movieId, String title, String genere ) { super(); this.movieId = movieId; this.title = title; this.genre = genere; } public Integer getMovieId() { return movieId; } public void setMovieId(Integer movieId ) { this.movieId = movieId ; } public String getTitle() { return title; } public void setTitle(String title ) { this.title = title; } public String getGenere() { return genre; } public void setGenere(String genere ) { this.genre = genere; } public static Movie parseRating(String str ) { String[] fields = str.split(","); if ( fields . length != 3) { System.out.println("The elements are ::"); Stream.of( fields ).forEach(System. out ::println); throw new IllegalArgumentException("Each line must contain 3 fields
while the current line has ::" + fields.length ); } Integer movieId = Integer.parseInt( fields [0]); String title = fields [1].trim(); String genere = fields [2].trim(); return new Movie(movieId, title, genere); } }

Spark Code:

Example for working on CSV data using Java 7:

JavaRDD<Movie> moviesRDD = sparkSession.read().textFile("movies.csv")
.javaRDD().filter( str -> !(null == str))
.filter( str -> !(str.length()==0))
.filter( str -> !str.contains("movieId"))
.map(new Function<String, Movie>()
{
private static final long serialVersionUID = 1L;
public Moviee call(String str ) {
return Movie.parseRating( str );
}
});

moviesRDD .foreach( new VoidFunction<Movie>() {
@Override
public void call(Movie m) throws Exception {
System. out .println( m );
}
});

Example for working on CSV data using Java 8:

JavaRDD<Movie> moviesRDD = sparkSession.read().textFile("movies.csv")
.javaRDD().filter(str -> !(null == str))
.filter(str -> !(str.length()==0))
.filter(str -> !str.contains("movieId"))
.map(str -> Movie.parseRating(str));
moviesRDD.foreach(m -> System.out.println(m));

However, in certain cases such as fields having a comma within the field, fails if we do not handle such cases explicitly, or in certain cases, one would like to infer the schema of CSV, or maybe even get the header information. For all such scenarios, one can use the Spark CSV library by Databricks for CSV datatypes:

Dataset<Row> csv_read = sparkSession.read().format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("movies.csv");
csv_read.printSchema();
csv_read.show();

The default inferred schema can be overridden by specifying the schema of StructType having StructField as elements representing the CSV fields:

StructType customSchema = new StructType(new StructField[] {
new StructField( "movieId", DataTypes.LongType, true, Metadata.empty()),
new StructField("title", DataTypes.StringType, true, Metadata.empty()),
new StructField("genres", DataTypes.StringType, true, Metadata.empty())
});

Dataset<Row> csv_custom_read = sparkSession.read().format("com.databricks.spark.csv")
.option("header", "true")
.schema(customSchema)
.load("movies.csv");
csv_custom_read.printSchema();
csv_custom_read.show();

Some of the features that can be used in the options() method while reading a CSV file are:

  • Header: When set to true, the first line of files will be used to name columns and will not be included in the data. All types will be assumed string. The default value is false.
  • Delimiter: By default, columns are delimited using ,, but the delimiter can be set to any character.
  • Quote: By default, the quote character is ", but it can be set to any character. Delimiters inside quotes are ignored.
  • Escape: By default, the escape character is , but it can be set to any character. Escaped quote characters are ignored.
  • Mode: Determines the parsing mode. By default it is Permissive. Possible values are:
    • PERMISSIVE: Tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.

    • DROPMALFORMED: Drops lines that have fewer or more tokens than expected or tokens that do not match the schema.

    • FAILFAST: Aborts with a RuntimeException if it encounters any malformed lines.

  • Charset: Defaults to UTF-8, but can be set to other valid charset names.
  • InferSchema: Automatically infers column types. It requires one extra pass over the data and is false by default.

The data from RDD can be saved to disk by using the write() method over RDD, and along with its format and compression codec if any, as follows:

csv_custom_read.write()
.format("com.databricks.spark.csv")
.option("header", "true")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.save("newMovies.csv");

Some of the common features that can be used in the options() method while writing a CSV file are:

  • Header: When set to true, the header (from the schema in the dataframe) will be written at the first line.
  • Delimiter: By default columns are delimited using ,, but delimiter can be set to any character.
  • Quote: By default the quote character is ", but it can be set to any character. This is written according to quoteMode.
  • Escape: By default the escape character is , but it can be set to any character. Escaped quote characters are written.
  • Codec: Compression codec to use when saving to file. Should be the fully qualified name of a class implementing org.apache.hadoop.io.compress.CompressionCodec or one of the case-insensitive shortened names (bzip2, gzip, lz4, and snappy). Defaults to no compression when a codec is not specified.
Instead of comma (,), the library can be used to read any character-separated file by specifying such a character with delimiter options.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.14.144.108