The RDD is the core data structure of the Apache Spark architecture. RDDs store and preserve data distributed and partitioned over multiple processors and servers so operations can be executed concurrently.
Data frames have been added, later on, to extend RDDs with SQL functionality. The original Apache Spark machine learning library, MLlib, uses RDDs that operate at a lower level (API). The more recent ML library allows data scientists to describe transformation and actions using SQL.
The introduction of the Hadoop ecosystem more than 10 years ago, opened the door to large-scale data processing and analytics. The Hadoop framework relies on a very effective distributed filesystem, HDFS, suitable for processing a large number of files containing sequential data. However, this reliance on the distributed filesystem for managing data becomes a bottleneck when executing iterative computation such as dynamic programming and machine learning algorithms.
Apache Spark lays out an efficient, resilient memory management layer on top of HDFS. This design provides developers with a very significant performance improvement, tenfold to hundred-fold depending on the type of applications.
Spark provides data engineers with a wide array of prebuilt transformation and actions inspired by the Scala API. Scala developers can leverage their knowledge of the language. However, Spark API is also available in Java and Python.
The performance of Spark relies on four core design principles [17:03]:
The developer can decide to persist and/or cache an RDD for future usage. An RDD may persist in memory only or on disk only—in memory if available, or on disk otherwise as de-serialized or serialized Java objects.
For instance, an RDD, rdd
, can be cached through serialization through a simple statement, as shown in the following code:
rdd.persist(StorageLevel.MEMORY_ONLY_SER).cache
Scala supports lazy values natively. The left side of the assignment, which can either be a value, object reference, or method, is performed once, that is, the first time it is invoked, as shown in the following code:
class Pipeline {
lazy val x = { println("x"); 1.5}
lazy val m = { println("m"); 3}
val n = { println("n"); 6}
def f = (m <<1)
def g(j: Int) = Math.pow(x, j)
}
val pipeline = new Pipeline //1
pipeline.g(pipeline.f) //2
The order of the variables printed is n
, m
, and then x
. The instantiation of the Pipeline
class initializes n
, but not m
or x
(line 1). At a later stage, the g
method is called, which in turn invokes the f
method. The f
method initializes the value m
it needs, and then g
initializes x
to compute its power to m<<1
(line 2).
Spark applies the same principle to RDDs by executing the transformation only when an action is performed. In other words, Spark postpones memory allocation, parallelization, and computation until the driver code gets the result through the execution of an action. The cascading effect of invoking all these transformations backwards is performed by the direct acyclic graph scheduler.
Spark is implemented in Scala, so you should not be too surprised that the most relevant Scala higher methods on collections are supported in Spark. The first table describes the transformation methods using Spark, as well as their counterparts in the Scala standard library. We use the (K, V) notation for (key, value) pairs:
Spark |
Scala |
Description |
---|---|---|
|
|
Transform an RDD by executing the function |
|
|
Transform an RDD by selecting the element for which the function |
|
|
Transform an RDD by mapping each element to a sequence of output items |
|
Execute the map method separately on each partition | |
|
Sample a fraction of the data with or without a replacement using a random generator | |
|
|
Called on (K,V) to generate a new (K, Seq(V)) RDD |
|
|
Create a new RDD as union of this RDD and the argument |
|
|
Eliminate duplicate elements from this RDD |
|
|
Aggregate or reduce the value corresponding to each key using the function |
|
|
Reorganize (K,V) in an RDD by the ascending, descending, or otherwise specified order of the keys, K |
|
Join a RDD (K,V) with a RDD (K,W) to generate a new RDD (K, (V,W)) | |
|
Implement a join operation, but generate a RDD (K, Seq(V), Seq(W)) |
Action methods trigger the collection or the reduction of the datasets from all partitions back to the driver, as listed here:
Spark |
Scala |
Description |
---|---|---|
|
|
Aggregate all the elements of the RDD across all the partitions and return a Scala object to the driver |
|
|
Collect and return all the elements of the RDD across all the partitions as a list in the driver |
|
|
Return the number of elements in the RDD to the driver |
|
|
Return the first element of the RDD to the driver |
|
|
Return the first n elements of the RDD to the driver |
|
Return an array of random elements from the RDD back to the driver | |
|
Write the elements of the RDD as a text file in either the local filesystem or HDFS | |
|
Generate a (K, Int) RDD with the original keys, K, and the count of values for each key | |
|
|
Execute a T=> Unit function on each element of the RDD |
Scala methods such as fold
, find
, drop
, flatten
, min
, max
, and sum
are not currently implemented in Spark. Other Scala methods such as zip
have to be used carefully, as there is no guarantee that the order of the two collections in zip
is maintained between partitions.
In a perfect world, variables are immutable and local to each partition to avoid race conditions. However, there are circumstances where variables have to be shared without breaking the immutability provided by Spark. To this extent, Spark duplicates shared variables and copies them to each partition of the dataset. Spark supports the following types of shared variables:
The four design principles can be summarized in the following diagram:
The preceding diagram illustrates the most common interaction between the Spark driver and its workers, as listed in the following steps:
Spark's in-memory computation enables iterative computing found in the training of machine learning models or execution dynamic programming or optimization algorithms. Spark runs on Windows, Linux, and macOS operating systems. It can be deployed either in local mode for a single host, or master mode for a distributed environment. The version of the Spark framework used in this chapter is 2.1.
The easiest way to learn Spark is to deploy a localhost in standalone mode. You can either deploy a precompiled version of Spark from the website, or build the JAR files using the simple build tool (sbt) or maven [12:14] as follows:
MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" mvn [args] –DskipTests clean package
Example:
Building on Hadoop 2.7 using Yarn clusters manager and Scala 2.11:
mvn -Pyarn –Phadoop-2.7 –Dhadoop.version-2.7.0 –Dscala-2.11 –DskipTests clean package
sbt/sbt [args] assembly
Example:
Building on Hadoop 2.7 using Yarn clusters manager and Scala 2.11:
sbt -Pyarn –pHadoop 2.7 –Dscala-2.11 assembly
Apache supports multiple deployment modes:
The communication between a master node (or driver), a cluster manager, and set of slave (or worker) nodes is illustrated in the following diagram:
Use any of the following methods to use the Spark shell:
./bin/spark-submit --class application_class --master local[4] --num-executors 10 --confspark.executor.extraJavaOptions="-XX:MaxPermSize=512m" --executor-memory 12G myApplication.jar
The command launches the application, myApplication
, with the main method, myApp.main
, on a 4-core CPU local host, and 12 GB of memory.
./bin/spark-submit --class application_class --master spark://162.198.11.201:7077 –-total-executor-cores 80 --confspark.executor.extraJavaOptions="-XX:MaxPermSize=512m" --driver-library-path xxx // path for binary libraries --executor-memory 24G myApplication.jar
The following screenshot captures the output for launching a Spark shell. The standard output dumps:
10.1.2.194:4040
[*]
is computed dynamicallyPotential pitfalls with Spark shell
Depending on your environment, you might need to disable logging information into the console by reconfiguring conf/log4j.properties
. The Spark shell might also conflict with the declaration of classpath
in the profile or the environment variables list. In this case, it has to be replaced by the ADD_JARS
environment variable as ADD_JARS = path1/jar1, path2/jar2
.
3.22.181.47