Plain and specially formatted text

Plain text can be read in Spark by calling the textFile() function on SparkContext. However, for specially formatted text, such as files separated by white space, tab, tilde (~), and so on, users need to iterate over each line of the text using the map() function and then split them on specific characters, such as tilde (~) in the case of tilde-separated files.

Consider, we have tilde-separated files that consist of data of people in the following format:

name~age~occupation 

Let's load this file as an RDD of Person objects, as follows:

Person POJO:

public class Person implements Serializable {
private String Name;
private Integer Age;
private String occupation;
public String getOccupation() {
return occupation ;
}
public void setOccupation(String occupation ) {
this.occupation = occupation;
}
public String getName() {
return Name;
}
public void setName(String name) {
Name = name;
}
public Integer getAge() {
return Age ;
}
public void setAge(Integer age ) {
Age = age ;
}
}

Example for plain and specially formatted text using Java 7:

JavaRDD<String> textFile = jsc.textFile("Path of the file");

JavaRDD<Person> people = textFile.map( new Function<String, Person>() {
public Person call(String line ) throws Exception {
String[] parts = line .split("~");
Person person = new Person();
person.setName(parts [0]);
person.setAge(Integer.parseInt(parts [1].trim()));
person.setOccupation(parts[2]);
return person ;
}
});

people.foreach(new VoidFunction<Person>() {
@Override
public void call(Person p) throws Exception {
System.out.println(p);
}
});

Example for plain and specially formatted text using Java 7:

JavaRDD<String> textFile = jsc.textFile( "Path of the file" );

JavaRDD<Person> people = textFile.map(line -> {
String[] parts = line.split("~");
Person person = new Person();
person.setName(parts [0]);
person.setAge(Integer.parseInt( parts [1].trim()));
person.setOccupation(parts [2]);
return person ;
});
people.foreach(p -> System.out.println(p));
POJO objects should be serializable, as RDD is a distributed data structure and while shuffling operations during transformations/actions, data gets transferred over the network.

This arrangement of reading a text file, creating an RDD of string, and then applying a map() function to map text fields to Java objects works perfectly fine as long as creating Java objects in the map() function for each row is not an expensive process. However, there is some performance gain if we choose mapPartitions() over map() in general, as it gives an iterator of the elements on the RDD for each partition. Later, in other sections, wherever we have the option to work on map() we must also consider the suitability of mapPartitions() in such scenarios as a thumb rule. Any pre-build stage, such as creating a database connection, opening a socket port, or maybe creating a web session has some overhead associated with it, and also the same object can be reused. Hence, in all such scenarios, mapPartitions() will work much better than map().

Java 7:

JavaRDD <Person> peoplePart = textFile.mapPartitions( new FlatMapFunction<Iterator<String>, Person>() {
  @Override
  public Iterator<Person> call(Iterator<String> p) throws Exception {
    ArrayList<Person> personList = new ArrayList<Person>();
    while ( p .hasNext()){
      String[] parts = p.next().split("~");
      Person person = new Person();
      person.setName(parts [0]);
      person.setAge(Integer.parseInt(parts[1].trim()));
      person.setOccupation(parts[2]);
personList.add( person ); } return personList .iterator(); } }); peoplePart .foreach( new VoidFunction<Person>() { @Override public void call(Person p) throws Exception { System.out.println(p); } });

Java 8:

JavaRDD<Person> peoplePart = textFile.mapPartitions( p -> {
  ArrayList<Person> personList = new ArrayList<Person>();
  while (p .hasNext()) {
    String[] parts = p.next().split("~");
    Person person = new Person();
    person.setName(parts [0]);
    person.setAge(Integer.parseInt(parts [1].trim()));
    person.setOccupation(parts [2]);
    personList add(person);
  }
  return personList.iterator();
});
peoplePart.foreach(p -> System.out.println(p));

Similarly, saving a text file in some specific format can also be done in a similar manner by calling the saveAsTextFile() method on RDD. It will write the data in a text file, where the .toString() method is called on each RDD element and one element is written per line:

people.saveAsTextFile( "Path of output file" );

The number of output files are equal to the number of partitions in the RDD, hence it can be controlled by calling the repartition() method on the RDD with the appropriate number before saving the text file:

people.repartition(1).saveAsTextFile("Path of output file");

Although the repartition() method achieves the task of managing the output file while saving, it may be an expensive process because of data shuffle. Coalesce is another option that can be used in place of repartition(), which will try to avoid the shuffling of data, hence may be better suited for these kinds of operations:

people.coalesce(1).saveAsTextFile( "Path of output file" );
Refer to the Advance Spark Transformations section in Chapter 7, Spark Programming Model – Advanced for a detailed explanation about the working of repartitioning and coalesce transformations .
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.133.160