mapPartitions

The working of this transformation is similar to map transformation. However, instead of acting upon each element of the RDD, it acts upon each partition of the RDD. So, the map function is executed once per RDD partition. Therefore, there will one-to-one mapping between partitions of the source RDD and the target RDD.

As a partition of an RDD is stored as a whole on a node, this transformation does not require shuffling.

In the following example, we will create an RDD of integers and increment all elements of the RDD by 1 using mapPartitions:

JavaRDD<Integer> intRDD = jsc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),2);

Java 7:

intRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<Integer> iterator) throws Exception {
List<Integer> intList =new ArrayList<>();
while(iterator.hasNext())
{
intList.add(iterator.next()+1);
}
return intList.iterator();
}
});

Java 8:

intRDD.mapPartitions(iterator -> {
List<Integer> intList =new ArrayList<>();
while(iterator.hasNext())
{
intList.add(iterator.next()+1);
}
return intList.iterator();
});
mapPartitions provide an iterator to access each element of the RDD belonging to the partition.

As stated in the previous chapter, the mapPartitions transformation is really useful if an interaction is required with external systems in the map phase, like reading from a database. Consider an instance where the RDD consists of 100 elements divided into three partitions. If, during the map phase, some data needs to be read from a database based on the value of the element, then running a map transformation will create 100 connections to the database. However, with mapPartitions only three connections will be created. A connection pool mechanism may be created to handle such scenarios. However mapPartitions implicitly provides an advantage in such situations. In the following example, we are reading rows from MySQL table based on every element of the RDD:

intRDD.mapPartitions(iterator -> {
Class.forName("com.mysql.jdbc.Driver");
Connection con=DriverManager.getConnection( "jdbc:mysql://localhost:3306/test","root","root");
Statement stmt=con.createStatement();
List<String> names =new ArrayList<>();
while(iterator.hasNext())
{
ResultSet rs=stmt.executeQuery("select name from emp where empId = "+iterator.next()); if(rs.first())
{names.add(rs.getString("empName"));
}
}
return names.iterator();});

Since mapPartitions is being used, we can iterate over all elements of RDD using the same connection. Had it been a map transformation, one connection to MYSQL per element would have been created.

The mapPartitions transformation provides an overload where it helps the user to provide a Boolean parameter which decides whether the partitioning information should be preserved or not. The following is the Scala declaration for the same:

def mapPartitions[U](f: FlatMapFunction[JIterator[T], U], preservesPartitioning: Boolean)

As per the Spark documentation, the value of this parameter should be false unless mapPartitions is being performed on a pair RDD and the input function does not modify the keys.

Partitioning information of an RDD is stored in the RDD object itself. An RDD class contains a reference to an instance of the partitioner class. This information is lost after map transformation is performed, which seems logical as the value of the element is being transformed. This holds true in the case of pair RDDs as well as Spark does not preserve partitioning information by default. The mapPartitions creates an instance of the com.javachen.spark.examples.rdd.MapPartitionsRDD class, to which the value of preservesPartitioning is passed as a constructor parameter, the default value of which is false.

The map transformation also instantiates com.javachen.spark.examples.rdd.MapPartitionsRDD with the default value of preservePartitioning, which is false. On the other hand, a filter transformation instantiates, mapPartitions RDD with preservesPartitioning value set to true. So partitioning information remains preserved in the case of filter transformation

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.58.51.36