Prerequisites

The following are the prerequisites for preparing the Spark application:

  • IDE for Java applications: We will be using the Eclipse IDE to develop the Spark application. Users can use any IDE as per their choice. Also, the IDE should have the Maven plugin installed (the latest version of Eclipse such as Mars and Neon consists of inbuilt Maven plugins).
  • Java 7 or above, but preferably Java 8.
  • The Maven package.

The following are the steps to get started with the Maven project in Java IDE:

  1. The first step is to create a Maven project. In Eclipse, it can be created as File|New|Project.
  2. Then search for Maven, as shown in the following screenshot:
  1. Select Maven Project and click Next. On the next, screen select the checkbox next to Create a simple project (skip the archetype selection) and click Next. On the next screen, add the Group Id and Artifact Id values as follows and then click Finish:
  1. Eclipse will create a Maven project for you. Maven projects can also be created using the mvn command line tool as follows:
mvn archetype:generate -DgroupId=com.spark.examples -DartifactId=ApacheSparkForJavaDevelopers 
Once the project is created using the mvn command line tool, it needs to be imported to the IDE.
  1. After creating a Maven project, expand the Maven project and edit pom.xml by adding the following dependency:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
  1. The following are the contents of the pom.xml file:

The following are some useful tips about compilation of the Maven project:

  • The Maven compiler plugin is added, which helps to set the Java version for compiling the project. It is currently set to 1.8 so Java 8 features can be used. The Java version can be changed as per the requirement.
  • Hadoop is not a prerequisite to be installed in your system to run Spark jobs. However, for certain scenarios one might get an error such as Failed to locate the winutils binary in the hadoop binary path as it searches for a binary file called winutils in the bin folder of HADOOP_HOME.

The following steps may be required to fix such an error:

  1. Download the winutils executable from any repository.
  2. Create a dummy directory where you place the downloaded executable winutils.exe. For example, : E:SparkForJavaDevHadoopin.
  3. Add the environment variable HADOOP_HOME that points to E:SparkForJavaDevHadoop using either of the options given here:
    • Eclipse|Your class which can be run as a Java application (containing the static main method)|Right click on Run as Run Configurations|Environment Tab:
  1. Add the following line of code in the Java main class itself:
System.setProperty("hadoop.home.dir", "PATH_OF_ HADOOP_HOME");

The next step is to create our Java class for the Spark application. The following is the SparkWordCount application using Java 7:

public class SparkWordCount { 
  public static void main(String[] args) { 
    SparkConf conf =new 
SparkConf().setMaster("local").setAppName("WordCount"); JavaSparkContext javaSparkContext = new JavaSparkContext(conf); JavaRDD<String> inputData =
javaSparkContext.textFile("path_of_input_file"); JavaPairRDD<String, Integer> flattenPairs =
inputData.flatMapToPair(new PairFlatMapFunction<String,
String, Integer>() {
@Override public Iterator<Tuple2<String, Integer>> call(String text) throws
Exception {
List<Tuple2<String,Integer>> tupleList =new ArrayList<>(); String[] textArray = text.split(" ");
for (String word:textArray) {
tupleList.add(new Tuple2<String, Integer>(word, 1)); } return tupleList.iterator(); } }); JavaPairRDD<String, Integer> wordCountRDD = flattenPairs.reduceByKey(new
Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}); wordCountRDD.saveAsTextFile("path_of_output_file"); } }

The following is the SparkWordCount application using Java 8:

public class SparkWordCount { 
  public static void main(String[] args) { 
 
    SparkConf conf =new 
SparkConf().setMaster("local").setAppName("WordCount");
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
JavaRDD<String> inputData =
javaSparkContext.textFile("path_of_input_file");
JavaPairRDD<String, Integer> flattenPairs =
inputData.flatMapToPair(text -> Arrays.asList(text.split("
")).stream().map(word ->new Tuple2<String,Integer>(word,1)).iterator());

JavaPairRDD<String, Integer> wordCountRDD = flattenPairs.reduceByKey((v1,
v2) -> v1+v2);
wordCountRDD.saveAsTextFile("path_of_output_file"); } }

In the preceding example of the WordCount application, we have performed the following steps:

  1. Initialize SparkContext: We started with creating a SparkConf object:
new SparkConf().setMaster("local").setAppName("WordCount");

Here, setMaster("local") defines that the application will run inside a single JVM; that is, we can execute it in the IDE itself. This is very useful for debugging Spark applications and to create test units as well. For running Spark applications on a Spark cluster setMaster() should point to the Spark Master URL. (Note: we will learn more about it in detail in Chapter 6, Spark On Cluster.)

setAppName("WordCount") helps to provide a name to our Spark application. In this case, we have named the application as WordCount. However, the user can provide any name to the application as per the requirement.

All other configuration parameters can be set using the SparkConf object. Then with the help of the SparkConf object we initialized the JavaSparkContext:

new JavaSparkContext(conf);

JavaSparkContext is specifically for Java Spark applications. As described in previous chapters, SparkContext is the mandatory component of a Spark application. Also, one Spark application can contain only one SparkContext. Now, with the help of SparkContext, we can create RDDs and can create a DAG of RDD transformations.

  1. Load the input data: Using the javaSparkContext, we loaded the data from a text file:
javaSparkContext.textFile("path_of_input_file");

There are other ways of loading the data as well. However, in this case we used text file as input. It is always advisable to provide an absolute path as a relative path depends upon the location from which the program is executed.

Also, we can create an RDD by executing the parallelize() function on SparkContext as follows:

JavaRDD<String> data =javaSparkContext.parallelize(Arrays.asList("where there is
a will there is a way"));
  1. Flatten the input data and create tuples: Once the data was loaded, we flattened it and converted it into tuples such as <key,value> pairs using the transformation flatMapToPair:
inputData.flatMapToPair(text -> Arrays.asList(text.split(" ")).stream().map(word
->new Tuple2<String,Integer>(word,1)).iterator())

Consider the content of the text file: Where there is a will there is a way.

So the flatMapToPair transformation will first break the text data into words (in our example, we have assumed the words are separated by space (" ") ) and create the tuples as follows:

(where,1)
(there,1)
(is,1)
(a,1)
(will,1)
(there,1)
(is,1)
(a,1)
(way,1)
  1. Aggregate (sum) the values for every key: Once data is transformed in the form of tuples as described previously, another transformation, reduceByKey is executed on the data to sum the values for every key which helps us to find the occurrence or frequency of every word in the data:
flattenPairs.reduceByKey((v1, v2) -> v1+v2);

So the preceding data will be reduced as follows:

(where, (1)) -> (where,1)
(there,(1+1) -> (there,2)
(is,(1+1)) -> (is,2)
(a,(1+1)) -> (a,2)
(will, (1)) -> (will,1)
(way, (1)) -> (way,1)

So this gives us the required result. If running in a cluster, the RDD is partitioned and then identical keys get shuffled to same executor, so this transformation allows network traffic as well (we will learn about partitioning concepts in Chapter 7, Spark Programming Model-Advanced).

  1. Run the final action: As described in previous chapters, transformations are lazy operations. The DAG of transformations will only execute once an action is executed. So the final step is to execute the action. In this example, we have executed saveToTextFile, which saves the output in a text file:
wordCountRDD.saveAsTextFile("path_of_output_file");

In this section, we discussed the classical word count problem and learned to solve it using Spark and Java. In this process of solving the word count problem we also set up the Eclipse IDE for executing the Spark job.

In the next section, we will discuss some common RDD transformations and briefly touch upon narrow and wide dependencies of RDD operations.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.189.180.43