Apache Spark core

The RDD is the core data structure of the Apache Spark architecture. RDDs store and preserve data distributed and partitioned over multiple processors and servers so operations can be executed concurrently.

Data frames have been added, later on, to extend RDDs with SQL functionality. The original Apache Spark machine learning library, MLlib, uses RDDs that operate at a lower level (API). The more recent ML library allows data scientists to describe transformation and actions using SQL.

Note

Deprecation RDD-based API for MLlib

The RDD-based classes and methods in MLlib have moved to maintenance mode in Spark 2.0 and will be completely removed in Spark 3.0

Why Spark?

The introduction of the Hadoop ecosystem more than 10 years ago, opened the door to large-scale data processing and analytics. The Hadoop framework relies on a very effective distributed filesystem, HDFS, suitable for processing a large number of files containing sequential data. However, this reliance on the distributed filesystem for managing data becomes a bottleneck when executing iterative computation such as dynamic programming and machine learning algorithms.

Apache Spark lays out an efficient, resilient memory management layer on top of HDFS. This design provides developers with a very significant performance improvement, tenfold to hundred-fold depending on the type of applications.

Spark provides data engineers with a wide array of prebuilt transformation and actions inspired by the Scala API. Scala developers can leverage their knowledge of the language. However, Spark API is also available in Java and Python.

Design principles

The performance of Spark relies on four core design principles [17:03]:

  • In-memory persistency
  • Laziness in scheduling tasks
  • Transformation and actions applied to RDDs
  • Implementation of shared variables

In-memory persistency

The developer can decide to persist and/or cache an RDD for future usage. An RDD may persist in memory only or on disk only—in memory if available, or on disk otherwise as de-serialized or serialized Java objects.

For instance, an RDD, rdd, can be cached through serialization through a simple statement, as shown in the following code:

rdd.persist(StorageLevel.MEMORY_ONLY_SER).cache

Note

Kryo serialization

Java serialization through the Serializable interface is notoriously slow. Fortunately, the Spark framework allows the developer to specify a more efficient serialization mechanism such as the Kryo library.

Laziness

Scala supports lazy values natively. The left side of the assignment, which can either be a value, object reference, or method, is performed once, that is, the first time it is invoked, as shown in the following code:

class Pipeline {
  lazy val x = { println("x"); 1.5}
  lazy val m = { println("m"); 3}

  val n = { println("n"); 6}   
  def f = (m <<1)
  def g(j: Int) = Math.pow(x, j)
}
val pipeline = new Pipeline //1
pipeline.g(pipeline.f)  //2

The order of the variables printed is n, m, and then x. The instantiation of the Pipeline class initializes n, but not m or x (line 1). At a later stage, the g method is called, which in turn invokes the f method. The f method initializes the value m it needs, and then g initializes x to compute its power to m<<1 (line 2).

Spark applies the same principle to RDDs by executing the transformation only when an action is performed. In other words, Spark postpones memory allocation, parallelization, and computation until the driver code gets the result through the execution of an action. The cascading effect of invoking all these transformations backwards is performed by the direct acyclic graph scheduler.

Transforms and actions

Spark is implemented in Scala, so you should not be too surprised that the most relevant Scala higher methods on collections are supported in Spark. The first table describes the transformation methods using Spark, as well as their counterparts in the Scala standard library. We use the (K, V) notation for (key, value) pairs:

Spark

Scala

Description

map(f)

map(f)

Transform an RDD by executing the function f on each element of the collection

filter(f)

filter(f)

Transform an RDD by selecting the element for which the function f returns true

flatMap(f)

flatMap(f)

Transform an RDD by mapping each element to a sequence of output items

mapPartitions(f)

 

Execute the map method separately on each partition

sample

 

Sample a fraction of the data with or without a replacement using a random generator

groupByKey

groupBy

Called on (K,V) to generate a new (K, Seq(V)) RDD

union

union

Create a new RDD as union of this RDD and the argument

distinct

distinct

Eliminate duplicate elements from this RDD

reduceByKey(f)

reduce

Aggregate or reduce the value corresponding to each key using the function f

sortByKey

sortWith

Reorganize (K,V) in an RDD by the ascending, descending, or otherwise specified order of the keys, K

join

 

Join a RDD (K,V) with a RDD (K,W) to generate a new RDD (K, (V,W))

coGroup

 

Implement a join operation, but generate a RDD (K, Seq(V), Seq(W))

Action methods trigger the collection or the reduction of the datasets from all partitions back to the driver, as listed here:

Spark

Scala

Description

reduce(f)

reduce(f)

Aggregate all the elements of the RDD across all the partitions and return a Scala object to the driver

collect

collect

Collect and return all the elements of the RDD across all the partitions as a list in the driver

count

count

Return the number of elements in the RDD to the driver

first

head

Return the first element of the RDD to the driver

take(n)

take(n)

Return the first n elements of the RDD to the driver

takeSample

 

Return an array of random elements from the RDD back to the driver

saveAsTextFile

 

Write the elements of the RDD as a text file in either the local filesystem or HDFS

countByKey

 

Generate a (K, Int) RDD with the original keys, K, and the count of values for each key

foreach

foreach

Execute a T=> Unit function on each element of the RDD

Scala methods such as fold, find, drop, flatten, min, max, and sum are not currently implemented in Spark. Other Scala methods such as zip have to be used carefully, as there is no guarantee that the order of the two collections in zip is maintained between partitions.

Shared variables

In a perfect world, variables are immutable and local to each partition to avoid race conditions. However, there are circumstances where variables have to be shared without breaking the immutability provided by Spark. To this extent, Spark duplicates shared variables and copies them to each partition of the dataset. Spark supports the following types of shared variables:

  • Broadcast values: These values encapsulate and forward data to all the partitions
  • Accumulator variables: These variables act as summations or reference counters

The four design principles can be summarized in the following diagram:

Shared variables

Interaction between Spark driver and RDDs

The preceding diagram illustrates the most common interaction between the Spark driver and its workers, as listed in the following steps:

  1. The input data, residing in either memory as a Scala collection or HDFS as a text file, is parallelized and partitioned into an RDD.
  2. A transformation function is applied on each element of the dataset across all the partitions.
  3. An action is performed to reduce and collect the data back to the driver.
  4. The data is processed locally within the driver.
  5. A second parallelization is performed to distribute computation through the RDDs.
  6. A variable is broadcast to all the partitions as an external parameter of the last RDD transformation.
  7. Finally, the last action aggregates and collects the final result back into the driver.

Note

Similarity with Akka

If you look closely, the management of datasets and RDDs by the Spark driver is not very different from that by Akka master and Actors actors of futures.

Experimenting with Spark

Spark's in-memory computation enables iterative computing found in the training of machine learning models or execution dynamic programming or optimization algorithms. Spark runs on Windows, Linux, and macOS operating systems. It can be deployed either in local mode for a single host, or master mode for a distributed environment. The version of the Spark framework used in this chapter is 2.1.

Note

JVM and Scala compatible versions

At the time of writing, the version of Spark 2.11 required Java 1.7 or higher and Scala 2.11.5 or higher. Future releases may require Scala 2.12.

Deploying Spark

The easiest way to learn Spark is to deploy a localhost in standalone mode. You can either deploy a precompiled version of Spark from the website, or build the JAR files using the simple build tool (sbt) or maven [12:14] as follows:

  1. Go to the download page at http://spark.apache.org/downloads.html.
  2. Choose a package type (Hadoop distribution). The Spark framework relies on HDFS to run in cluster mode; therefore, you need to select a distribution of Hadoop, or an open source distribution, MapR or Cloudera.
  3. Download and decompress the package.
  4. If you are interested in the latest functionality added to the framework, check out the newest source code at https://github.com/apache/spark.
  5. Next, you need to build, or assemble, the Apache Spark libraries from the top-level directory using either Maven or sbt.
  6. Set the following Maven options to support build, deployment, and execution:
    MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=512M
              -XX:ReservedCodeCacheSize=512m"
    mvn [args] –DskipTests clean package
    

    Example:

    Building on Hadoop 2.7 using Yarn clusters manager and Scala 2.11:

    mvn -Pyarn –Phadoop-2.7 –Dhadoop.version-2.7.0 –Dscala-2.11 
        –DskipTests clean package
    
  7. Use the following command for simple build tool:
    sbt/sbt [args] assembly
    

    Example:

    Building on Hadoop 2.7 using Yarn clusters manager and Scala 2.11:

    sbt -Pyarn –pHadoop 2.7 –Dscala-2.11 assembly
    

Note

Installation instructions

The directory and name of artifacts used in Spark will undoubtedly change over time. Please refer to the documentation and installation guide for the latest version of Spark.

Apache supports multiple deployment modes:

  • Standalone mode: The driver and executors run as master and slave Akka actors, bundled with the default Spark distribution JAR file.
  • Local mode: This is a standalone mode running on a single host. The slave Actors are deployed across multiple core within the same host.
  • Yarn clusters manager: Spark relies on the Yarn resource manager running on Hadoop version 2 and higher. The Spark driver can run either on the same JVM as the client application (client mode) or on the same JVM as the master (cluster mode).
  • Apache Mesos resource manager: This deployment allows dynamic and scalable partitioning. Apache Mesos is an open source, general purpose cluster manager that has to be installed separately (refer to http://mesos.apache.org/). Mesos manages the abstracted hardware artifacts such as memory or storage.

The communication between a master node (or driver), a cluster manager, and set of slave (or worker) nodes is illustrated in the following diagram:

Deploying Spark

Master-worker communication through a cluster manager

Tip

Installation under Windows

Hadoop relies on some UNIX/Linux utilities that need to be added to the development environment when running on Windows. The file winutils.exe has to be installed and added to the HADOOP_PATH environment variable.

Using Spark shell

Use any of the following methods to use the Spark shell:

  • The shell is an easy way to get your feet wet with Spark-RDD. To launch the shell locally, execute ./bin/spark-shell –master local[8] to execute the shell on an 8-core localhost.
  • To launch a Spark application locally, connect with the shell and execute the following command line:
    ./bin/spark-submit 
    --class application_class
       --master local[4]
       --num-executors 10
    --confspark.executor.extraJavaOptions="-XX:MaxPermSize=512m"
    --executor-memory 12G
    myApplication.jar
    

    The command launches the application, myApplication, with the main method, myApp.main, on a 4-core CPU local host, and 12 GB of memory.

  • To launch the same Spark application remotely, connect with the shell to execute the following command line:
    ./bin/spark-submit 
    --class application_class
    --master spark://162.198.11.201:7077 
    –-total-executor-cores 80  
    --confspark.executor.extraJavaOptions="-XX:MaxPermSize=512m"
    --driver-library-path xxx  // path for binary libraries
        --executor-memory 24G
    myApplication.jar
    

The following screenshot captures the output for launching a Spark shell. The standard output dumps:

  • IP port of the web-based UI to monitor staging and execution on RDDs and Data frames: 10.1.2.194:4040
  • The deployment mode: standalone, local deployment for which the number of cores [*] is computed dynamically
  • Session-based context:
    Using Spark shell

    Partial screenshot of Spark shell command line output

Tip

Potential pitfalls with Spark shell

Depending on your environment, you might need to disable logging information into the console by reconfiguring conf/log4j.properties. The Spark shell might also conflict with the declaration of classpath in the profile or the environment variables list. In this case, it has to be replaced by the ADD_JARS environment variable as ADD_JARS = path1/jar1, path2/jar2.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.22.181.47