Submitting jobs to the Spark cluster (local)

There are multiple components involved in running Spark in distributed mode. In the self-contained application mode (the main program that we have run throughout this book so far), all of these components run on a single JVM. The following diagram elaborates the various components and their functions in running the Scala program in distributed mode:

Submitting jobs to the Spark cluster (local)

As a first step, the RDD graph that we construct using the various operations on our RDD (map, filter, join, and so on) is passed to the Directed Acyclic Graph (DAG) scheduler. The DAG scheduler optimizes the flow and converts all RDD operations into groups of tasks called stages. Generally, all tasks before a shuffle are wrapped into a stage. Consider operations in which there is a one-to-one mapping between tasks; for example, a map or filter operator yields one output for every input. If there is a map on an element on RDD followed by a filter, they are generally pipelined (the map and the filter) to form a single task that can be executed by a single worker, not to mention the benefits of data locality. Relating this to our traditional Hadoop MapReduce, where data is written to the disk at every stage, would help us really appreciate the Spark lineage graph.

These shuffle-separated stages are then passed to the task scheduler, which splits them into tasks and submits them to the cluster manager. Spark comes bundled with a simple cluster manager that can receive the tasks and run it against a set of worker nodes. However, Spark applications can also be run on popular cluster managers, such as Mesos and YARN.

With YARN/Mesos, we can run multiple executors on the same worker node. Besides, YARN and Mesos can host non-Spark jobs in their cluster along with Spark jobs.

Submitting jobs to the Spark cluster (local)

In the Spark standalone cluster, prior to Spark 1.4, the number of executors per worker node per application was limited to 1. However, we could increase the number of worker instances per worker node using the SPARK_WORKER_INSTANCES parameter. With Spark 1.4 (https://issues.apache.org/jira/browse/SPARK-1706), we are able to run multiple executors on the same node, just as in Mesos/YARN.

Note

If we intend to run multiple worker instances within a single machine, we must ensure that we configure the SPARK_WORKER_CORES property to limit the number of cores that can be used by each worker. The default is all!

Submitting jobs to the Spark cluster (local)

In this recipe, we will be deploying the Spark application on a standalone cluster running on a single machine. For all the recipes in this chapter, we'll be using the binary classification app that we built in the previous chapter as a deployment candidate. This recipe assumes that you have some knowledge of the concepts of HDFS and basic operations on them.

How to do it...

Submitting a Spark job to the local cluster involves the following steps:

  1. Downloading Spark.
  2. Running HDFS on pseudo-clustered mode.
  3. Running the Spark master and slave locally.
  4. Pushing data into HDFS.
  5. Submitting the Spark application on the cluster.

Downloading Spark

Throughout this book, we have been using Spark version 1.4.1, as we can see in our build.sbt. Now, let's head over to the download page (https://spark.apache.org/downloads.html) and download the spark-1.4.1-bin-hadoop2.6.tgz bundle, as shown here:

Downloading Spark

Running HDFS on Pseudo-clustered mode

Instead of loading the file from the local filesystem for our Spark application, let's have the file stored away in HDFS. In order to do this, let's have a locally running Pseudo-distributed cluster (https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html#Pseudo-Distributed_Operation) of Hadoop 2.6.0.

After formatting our name node using bin/hdfs namenode -format and bringing up our data node and name node using sbin/start-dfs.sh, let's confirm that all the processes that we need are running properly. We do this using Jps. The following screenshot shows what you are expected to see once you start the dfs daemon:

Running HDFS on Pseudo-clustered mode

Running the Spark master and slave locally

In order to submit our assembly JAR to a Spark cluster, we have to first bring up the Spark master and worker nodes.

All that we need to do to run Spark on the local machine is go to the downloaded (and extracted) spark folder and run sbin/start-all.sh from the spark home directory. This will bring up the Master and a Worker node of Spark. The Master's web UI is accessible from port 8080. We use this port to check the status of the job. The default service port of the Master is 7077. We'll be using this port to submit our assembly JAR as a job to the Spark cluster.

Running the Spark master and slave locally

Let's confirm the running of the Master and the Worker nodes using Jps:

Running the Spark master and slave locally

Pushing data into HDFS

This just involves running the mkdir and put commands on HDFS:

bash-3.2$ hadoop fs -mkdir /scalada

bash-3.2$ hadoop fs -put /Users/Gabriel/Apps/SMSSpamCollection /scalada/

bash-3.2$ hadoop fs -ls /scalada

Found 1 items
-rw-r--r--   1 Gabriel supergroup     477907 2015-07-18 16:59 /scalada/SMSSpamCollection

We can also confirm this via the HDFS web interface at 50070 and by going to Utilities | Browse the file system, as shown here:

Pushing data into HDFS

Submitting the Spark application on the cluster

Before we submit the Spark application to be run against the local cluster, let's change the classification program (BinaryClassificationSpam) to point to the HDFS location:

val docs = sc.textFile("hdfs://localhost:9000/scalada/SMSSpamCollection").map(line => {
    val words = line.split("	")
    Document(words.head.trim(), words.tail.mkString(" "))
  })

By default, Spark 1.4.1 uses Hadoop 2.2.0. Now that we are trying to run the job on Hadoop 2.6.0, and are using the Spark binary prebuilt for Hadoop 2.6 and later, let's change build.sbt to reflect that:

libraryDependencies  ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
  "com.databricks" %% "spark-csv" % "1.0.3",
  "org.apache.hadoop"  % "hadoop-client" % "2.6.0",
  ("org.scalanlp" % "epic-parser-en-span_2.10" % "2015.2.19").
    exclude("xml-apis", "xml-apis")
)

Run sbt clean assembly to build the Uber JAR, like this:

Submitting the Spark application on the cluster
./bin/spark-submit 
  --class com.packt.scalada.learning.BinaryClassificationSpam 
  --master spark://localhost:7077 
  --executor-memory 2G 
  --total-executor-cores 2 
   <project root>/target/scala-2.10/scalada-learning-assembly.jar

Here is the output:

The following screenshot shows that we have successfully run our classification job on a Spark cluster as against the standalone app that we used in the previous chapter:

Submitting the Spark application on the cluster
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.133.160