Cluster design

Apache Spark is a distributed and parallel processing system and it also provides in-memory computing capabilities. This type of computing paradigm needs an associated storage system so that you can deploy your application on top of a big data cluster. To make this happen, you will have to use distributed storage systems such as HDFS, S3, HBase, and Hive. For moving data, you will be needing other technologies such as Sqoop, Kinesis, Twitter, Flume, and Kafka.

In practice, you can configure a small Hadoop cluster very easily. You only need to have a single master and multiple worker nodes. In your Hadoop cluster, generally, a master node consists of NameNodes, DataNodes, JobTracker, and TaskTracker. A worker node, on the other hand, can be configured so that it works both as a DataNode and as a TaskTracker.

For security reasons, most of the big data cluster might set up behind a network firewall so that the complexity caused by the firewall can be overcome or at least reduced by the computing nodes. Otherwise, computing nodes cannot be accessed from outside of the network, that is, extranet. The following figure shows a simplified big data cluster that is commonly used in Spark:

Figure 2: A general architecture for big data processing with JVM

The above picture shows a cluster consisting of five computing nodes. Here each node has a dedicated executor JVM, one per CPU core, and the Spark Driver JVM sitting outside the cluster. The disk is directly attached to the nodes using the JBOD (Just a bunch of disks) approach. Very large files are partitioned over the disks, and a virtual file system such as HDFS makes these chunks available as one large virtual file. The following simplified component model shows the driver JVM sitting outside the cluster. It talks to the cluster manager (see Figure 4) in order to obtain permission to schedule tasks on the worker nodes because the cluster manager keeps track on resource allocation of all processes running on the cluster.

If you have developed your Spark application using Scala or Java, it means that your job is a JVM-based process. For your JVM-based process, you can simply configure the Java heap space by specifying the following two parameters:

  • -Xmx: This one specifies the upper limit of your Java heap space
  • -Xms: This one is the lower limit of the Java heap space

Once you have sumitted a Spark job, heap memory need to be allocated for your Spark jobs. The following figure provides some insights on how:

Figure 3: JVM memory management

As demonstrated in the preceding figure, Spark starts a Spark job with 512 MB of JVM heap space. However, for an uninterrupted processing of your Spark job and to avoid the Out of Memory (OOM) error, Spark allows the computing nodes to utilize only up to 90% of the heap (that is, ~461 MB), which is eventually increased or decreased by controlling the spark.storage.safetyFraction parameter in Spark environment. To be more realistic, the JVM can be seen as a concatenation of Storage (60% of the Java heap), 20% of the heap for execution (aka Shuffle), and the rest of the 20% for other storage.

Moreover, Spark is a cluster computing tool that tries to utilize both in-memory and disk-based computing and allows users to store some data in memory. In reality, Spark utilizes the main memory only for its LRU cache. For uninterrupted caching mechanism, a little amount of memory is required to be reserved for the application specific data processing. Informally, this is around 60% of the Java heap space controlled by the spark.memory.fraction.

Therefore, if you would like to see or calculate how much application specific data you can cache in memory in your Spark application, you can just sum up all the heap sizes usages by all the executors and multiply it by the safetyFraction and spark.memory.fraction. In practice, 54% of the total heap size (276.48 MB) you can allow Spark computing nodes to be used. Now the shuffle memory is calculated as follows:

Shuffle memory= Heap Size * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction

The default values for spark.shuffle.safetyFraction and spark.shuffle.memoryFraction are 80% and 20%, respectively. Therefore, in practical, you can use up to 0.8*0.2 = 16% of the JVM heap for the shuffle. Finally, unroll memory is the amount of the main memory (in a computing node) that can be utilized by the unroll processes. The calculation goes as follows:

Unroll memory = spark.storage.unrollFraction * spark.storage.memoryFraction * spark.storage.safetyFraction

The above is around 11% of the heap (0.2*0.6*0.9 = 10.8~11%), that is, 56.32 MB of the Java heap space.

More detailed discussion can be found at http://spark.apache.org/docs/latest/configuration.html.

As we will see later, there exist a variety of different cluster managers, some of them also capable of managing other Hadoop workloads or even non-Hadoop applications in parallel to the Spark executors. Note that the executor and driver have bidirectional communication all the time, so network wise they should also be sitting close together.

Figure 4: Driver, master, and worker architecture in Spark for cluster

Spark uses the driver (aka the driver program), master, and worker architecture (aka host, slave, or computing nodes). The driver program (or machine) talks to a single coordinator called master node. The master node actually manages all the workers (aka the slave or computing nodes) in which several executors run in parallel in a cluster. It is to be noted that the master is also a computing node having large memory, storage, OS, and underlying computing resources. Conceptually, this architecture can be shown in Figure 4. More details will be discussed later in this section.

In a real cluster mode, the cluster manager (aka the resource manager) manages all the resources of computing nodes in a cluster. Generally, firewalls, while adding security to the cluster, also increase the complexity. Ports between system components need to be opened up so that they can talk to each other. For instance, Zookeeper is used by many components for configuration. Apache Kafka, which is a subscribing messaging system, uses Zookeeper for configuring its topics, groups, consumers, and producers. So, client ports to Zookeeper, potentially across the firewall, need to be open.

Finally, the allocation of systems to cluster nodes needs to be considered. For instance, if Apache Spark uses Flume or Kafka, then in-memory channels will be used. Apache Spark should not be competing with other Apache components for memory usage. Depending upon your data flows and memory usage, it might be necessary to have the Spark, Hadoop, Zookeeper, Flume, and other tools on distinct cluster nodes. Alternatively, resource managers such as YARN, Mesos, or Docker, for instance, can be used to tackle this problem as well. In standard Hadoop environments, most likely YARN is there anyway.

The computing nodes that act as workers, or Spark master, will need greater resources than the cluster processing nodes within the firewall. When many Hadoop ecosystem components are deployed on the cluster, all of them will need extra memory on the master server. You should monitor worker nodes for resource usage and adjust in terms of resources and/or application location as necessary. YARN, for instance, is taking care of this.

This section has briefly set the scene for the big data cluster in terms of Apache Spark, Hadoop, and other tools. However, how might the Apache Spark cluster itself, within the big data cluster, be configured? For instance, it is possible to have many types of Spark cluster manager. The next section will examine this and describe each type of Apache Spark cluster manager.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.20.193