A parallel execution involves splitting the workload into subtasks that are executed in different threads or on different nodes. Let's see how Spark does this and how it manages execution and communication between the subtasks.
Spark workload splitting is determined by the number of partitions for Resilient Distributed Dataset (RDD), the basic abstraction in Spark, and the pipeline structure. An RDD represents an immutable, partitioned collection of elements that can be operated on in parallel. While the specifics might depend on the mode in which Spark runs, the following diagram captures the Spark task/resource scheduling:
You will find that Spark, as well as Hadoop, has a lot of parameters. Some of them are specified as environment variables (refer to the $SPARK_HOME/conf/spark-env.sh
file), and yet some can be given as a command-line parameter. Moreover, some files with pre-defined names can contain parameters that will change the Spark behavior, such as core-site.xml
. This might be confusing, and I will cover as much as possible in this and the following chapters. If you are working with Hadoop Distributed File System (HDFS), then the core-site.xml
and hdfs-site.xml
files will contain the pointer and specifications for the HDFS master. The requirement for picking this file is that it has to be on CLASSPATH
Java process, which, again, may be set by either specifying HADOOP_CONF_DIR
or SPARK_CLASSPATH
environment variables. As is usual with open source, you need to grep the code sometimes to understand how various parameters work, so having a copy of the source tree on your laptop is a good idea.
Each node in the cluster can run one or more executors, and each executor can schedule a sequence of tasks to perform the Spark operations. Spark driver is responsible for scheduling the execution and works with the cluster scheduler, such as Mesos or YARN to schedule the available resources. Spark driver usually runs on the client machine, but in the latest release, it can also run in the cluster under the cluster manager. YARN and Mesos have the ability to dynamically manage the number of executors that run concurrently on each node, provided the resource constraints.
In the Standalone mode, Spark Master does the work of the cluster scheduler—it might be less efficient in allocating resources, but it's better than nothing in the absence of preconfigured Mesos or YARN. Spark standard distribution contains shell scripts to start Spark in Standalone mode in the sbin
directory. Spark Master and driver communicate directly with one or several Spark workers that run on individual nodes. Once the master is running, you can start Spark shell with the following command:
$ bin/spark-shell --master spark://<master-address>:7077
Note that you can always run Spark in local mode, which means that all tasks will be executed in a single JVM, by specifying --master local[2]
, where 2
is the number of threads that have to be at least 2
. In fact, we will be using the local mode very often in this book for running small examples.
Spark shell is an application from the Spark point of view. Once you start a Spark application, you will see it under Running Applications in the Spark Master UI (or in the corresponding cluster manager), which can redirect you to the Spark application HTTP UI at port 4040, where one can see the subtask execution timeline and other important properties such as environment setting, classpath, parameters passed to the JVM, and information on resource usage (refer to Figure 3-3):
As we saw, with Spark, one can easily switch between local and cluster mode by providing the --master
command-line option, setting a MASTER
environment variable, or modifying spark-defaults.conf
, which should be on the classpath during the execution, or even set explicitly using the setters
method on the SparkConf
object directly in Scala, which will be covered later:
Cluster Manager |
MASTER env variable |
Comments |
---|---|---|
Local (single node, multiple threads) |
|
n is the number of threads to use, should be greater than or equal to 2. If you want Spark to communicate with other Hadoop tools such as Hive, you still need to point it to the cluster by either setting the |
Standalone (Daemons running on the nodes) |
|
This has a set of start/stop scripts in the |
Mesos |
(multimaster) |
Here, you need to set |
YARN |
|
Spark driver can run either in the cluster or on the client node, which is managed by the Set |
The most common ports are 8080, the master UI, and 4040, the application UI. Other Spark ports are summarized in the following table:
Standalone ports | ||||
---|---|---|---|---|
From |
To |
Default Port |
Purpose |
Configuration Setting |
Browser |
Standalone Master |
8080 |
Web UI |
|
Browser |
Standalone worker |
8081 |
Web UI |
|
Driver / Standalone worker |
Standalone Master |
7077 |
Submit job to cluster / Join cluster |
|
Standalone master |
Standalone worker |
(random) |
Schedule executors |
|
Executor / Standalone master |
Driver |
(random) |
Connect to application / Notify executor state changes |
|
Other ports | ||||
From |
To |
Default Port |
Purpose |
Configuration Setting |
Browser |
Application |
4040 |
Web UI |
|
Browser |
History server |
18080 |
Web UI |
|
Driver |
Executor |
(random) |
Schedule tasks |
|
Executor |
Driver |
(random) |
File server for files and jars |
|
Executor |
Driver |
(random) |
HTTP broadcast |
|
Also, some of the documentation is available with the source distribution in the docs
subdirectory, but may be out of date.
Since the emergence of Spark, multiple applications that benefit from Spark's ability to cache RDDs have been written: Shark, Spork (Pig on Spark), graph libraries (GraphX, GraphFrames), streaming, MLlib, and so on; some of these will be covered here and in later chapters.
In this section, I will cover major architecture components to collect, store, and analyze the data in Spark. While I will cover a more complete data life cycle architecture in Chapter 2, Data Pipelines and Modeling, here are Spark-specific components:
All of these are different ways to reliably move data from one place to another without loss and duplication. They usually implement a publish-subscribe model, where multiple writers and readers can write and read from the same queues with different guarantees. Flume stands out as a first distributed log and event management implementation, but it is slowly replaced by Kafka, a fully functional publish-subscribe distributed message queue optionally persistent across a distributed set of nodes developed at LinkedIn. We covered Flume and Kafka briefly in the previous chapter. Flume configuration is file-based and is traditionally used to deliver messages from a Flume source to one or several Flume sinks. One of the popular sources is netcat
—listening on raw data over a port. For example, the following configuration describes an agent receiving data and then writing them to HDFS every 30 seconds (default):
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 4987 # Describe the sink (the instructions to configure and start HDFS are provided in the Appendix) a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=hdfs://localhost:8020/flume/netcat/data a1.sinks.k1.hdfs.filePrefix=chapter03.example a1.sinks.k1.channel=c1 a1.sinks.k1.hdfs.writeFormat = Text # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
This file is included as part of the code provided with this book in the chapter03/conf
directory. Let's download and start Flume agent (check the MD5 sum with one provided at http://flume.apache.org/download.html):
$ wget http://mirrors.ocf.berkeley.edu/apache/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz $ md5sum apache-flume-1.6.0-bin.tar.gz MD5 (apache-flume-1.6.0-bin.tar.gz) = defd21ad8d2b6f28cc0a16b96f652099 $ tar xf apache-flume-1.6.0-bin.tar.gz $ cd apache-flume-1.6.0-bin $ ./bin/flume-ng agent -Dlog.dir=. -Dflume.log.level=DEBUG,console -n a1 -f ../chapter03/conf/flume.conf Info: Including Hadoop libraries found via (/Users/akozlov/hadoop-2.6.4/bin/hadoop) for HDFS access Info: Excluding /Users/akozlov/hadoop-2.6.4/share/hadoop/common/lib/slf4j-api-1.7.5.jar from classpath Info: Excluding /Users/akozlov/hadoop-2.6.4/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar from classpath ...
Now, in a separate window, you can type a netcat
command to send text to the Flume agent:
$ nc localhost 4987 Hello OK World OK ...
The Flume agent will first create a *.tmp
file and then rename it to a file without extension (the file extension can be used to filter out files being written to):
$ bin/hdfs dfs -text /flume/netcat/data/chapter03.example.1463052301372 16/05/12 04:27:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 1463052302380 Hello 1463052304307 World
Here, each row is a Unix time in milliseconds and data received. In this case, we put the data into HDFS, from where they can be analyzed by a Spark/Scala program, we can exclude the files being written to by the *.tmp
filename pattern. However, if you are really interested in up-to-the-last-minute values, Spark, as well as some other platforms, supports streaming, which I will cover in a few sections.
HDFS, Cassandra, S3, and Tachyon are the different ways to get the data into persistent storage and compute nodes as necessary with different guarantees. HDFS is a distributed storage implemented as a part of Hadoop, which serves as the backend for many products in the Hadoop ecosystem. HDFS divides each file into blocks, which are 128 MB in size by default, and stores each block on at least three nodes. Although HDFS is reliable and supports HA, a general complain about HDFS storage is that it is slow, particularly for machine learning purposes. Cassandra is a general-purpose key/value storage that also stores multiple copies of a row and can be configured to support different levels of consistency to optimize read or write speeds. The advantage that Cassandra over HDFS model is that it does not have a central master node; the reads and writes are completed based on the consensus algorithm. This, however, may sometimes reflect on the Cassandra stability. S3 is the Amazon storage: The data is stored off-cluster, which affects the I/O speed. Finally, the recently developed Tachyon claims to utilize node's memory to optimize access to working sets across the nodes.
Additionally, new backends are being constantly developed, for example, Kudu from Cloudera (http://getkudu.io/kudu.pdf) and Ignite File System (IGFS) from GridGain (http://apacheignite.gridgain.org/v1.0/docs/igfs). Both are open source and Apache-licensed.
As we mentioned before, Spark can run under different cluster resource schedulers. These are various implementations to schedule Spark's containers and tasks on the cluster. The schedulers can be viewed as cluster kernels, performing functions similar to the operating system kernel: resource allocation, scheduling, I/O optimization, application services, and UI.
Mesos is one of the original cluster managers and is built using the same principles as the Linux kernel, only at a different level of abstraction. A Mesos slave runs on every machine and provides API's for resource management and scheduling across entire datacenter and cloud environments. Mesos is written in C++.
YARN is a more recent cluster manager developed by Yahoo. Each node in YARN runs a Node Manager, which communicates with the Resource Manager which may run on a separate node. The resource manager schedules the task to satisfy memory and CPU constraints. The Spark driver itself can run either in the cluster, which is called the cluster mode for YARN. Otherwise, in the client mode, only Spark executors run in the cluster and the driver that schedules Spark pipelines runs on the same machine that runs Spark shell or submit program. The Spark executors will talk to the local host over a random open port in this case. YARN is written in Java with the consequences of unpredictable GC pauses, which might make latency's long tail fatter.
Finally, if none of these resource schedulers are available, the standalone deployment mode starts a org.apache.spark.deploy.worker.Worker
process on each node that communicates with the Spark Master process run as org.apache.spark.deploy.master.Master
. The worker process is completely managed by the master and can run multiple executors and tasks (refer to Figure 3-2).
In practical implementations, it is advised to track the program parallelism and required resources through driver's UI and adjust the parallelism and available memory, increasing the parallelism if necessary. In the following section, we will start looking at how Scala and Scala in Spark address different problems.
18.191.139.245