MLlib is the machine learning library that is provided with Apache Spark, the in memory cluster based open source data processing system. In this chapter, I will examine the functionality, provided within the MLlib library in terms of areas such as regression, classification, and neural processing. I will examine the theory behind each algorithm before providing working examples that tackle real problems. The example code and documentation on the web can be sparse and confusing. I will take a step-by-step approach in describing how the following algorithms can be used, and what they are capable of doing:
Having decided to learn about Apache Spark, I am assuming that you are familiar with Hadoop. Before I proceed, I will explain a little about my environment. My Hadoop cluster is installed on a set of Centos 6.5 Linux 64 bit servers. The following section will describe the architecture in detail.
Before delving into the Apache Spark modules, I wanted to explain the structure and version of Hadoop and Spark clusters that I will use in this book. I will be using the Cloudera CDH 5.1.3 version of Hadoop for storage and I will be using two versions of Spark: 1.0 and 1.3 in this chapter.
The earlier version is compatible with Cloudera software, and has been tested and packaged by them. It is installed as a set of Linux services from the Cloudera repository using the yum command. Because I want to examine the Neural Net technology that has not been released yet, I will also download and run the development version of Spark 1.3 from GitHub. This will be explained later in the chapter.
The following diagram explains the structure of the small Hadoop cluster that I will use in this chapter:
The previous diagram shows a five-node Hadoop cluster with a NameNode called hc2nn, and DataNodes hc2r1m1 to hc2r1m4. It also shows an Apache Spark cluster with a master node and four slave nodes. The Hadoop cluster provides the physical Centos 6 Linux machines while the Spark cluster runs on the same hosts. For instance, the Spark master server runs on the Hadoop Name Node machine hc2nn, whereas the Spark slave1 worker runs on the host hc2r1m1.
The Linux server naming standard used higher up should be explained. For instance the Hadoop NameNode server is called hc2nn. The h in this server name means Hadoop, the c means cluster, and the nn means NameNode. So, hc2nn means Hadoop cluster 2 NameNode. Similarly, for the server hc2r1m1, the h means Hadoop the c means cluster the r means rack and the m means machine. So, the name stands for Hadoop cluster 2 rack 1 machine 1. In a large Hadoop cluster, the machines will be organized into racks, so this naming standard means that the servers will be easy to locate.
You can arrange your Spark and Hadoop clusters as you see fit, they don't need to be on the same hosts. For the purpose of writing this book, I have limited machines available so it makes sense to co-locate the Hadoop and Spark clusters. You can use entirely separate machines for each cluster, as long as Spark is able to access Hadoop (if you want to use it for distributed storage).
Remember that although Spark is used for the speed of its in-memory distributed processing, it doesn't provide storage. You can use the Host file system to read and write your data, but if your data volumes are big enough to be described as big data, then it makes sense to use a distributed storage system like Hadoop.
Remember also that Apache Spark may only be the processing step in your ETL (Extract, Transform, Load) chain. It doesn't provide the rich tool set that the Hadoop ecosystem contains. You may still need Nutch/Gora/Solr for data acquisition; Sqoop and Flume for moving data; Oozie for scheduling; and HBase, or Hive for storage. The point that I am making is that although Apache Spark is a very powerful processing system, it should be considered a part of the wider Hadoop ecosystem.
Having described the environment that will be used in this chapter, I will move on to describe the functionality of the Apache Spark MLlib (Machine Learning library).
The Scala language will be used for coding samples in this book. This is because as a scripting language, it produces less code than Java. It can also be used for the Spark shell, as well as compiled with Apache Spark applications. I will be using the sbt tool to compile the Scala code, which I have installed as follows:
[hadoop@hc2nn ~]# su - [root@hc2nn ~]# cd /tmp [root@hc2nn ~]#wget http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.rpm [root@hc2nn ~]# rpm -ivh sbt.rpm
For convenience while writing this book, I have used the generic Linux account called hadoop on the Hadoop NameNode server hc2nn
. As the previous commands show that I need to install sbt
as the root account, which I have accessed via su
(switch user). I have then downloaded the sbt.rpm
file, to the /tmp
directory, from the web-based server called repo.scala-sbt.org
using wget
. Finally, I have installed the rpm
file using the rpm
command with the options i
for install, v
for verify, and h
to print the hash marks while the package is being installed.
Downloading the example code
You can download the example code files from your account at http://www.packtpub.com for all the Packt Publishing books you have purchased. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
I have developed all of the Scala code for Apache Spark, in this chapter, on the Linux server hc2nn
, using the Linux hadoop account. I have placed each set of code within a sub directory under /home/hadoop/spark
. For instance, the following sbt structure diagram shows that the MLlib Naïve Bayes code is stored within a subdirectory called nbayes
, under the spark
directory. What the diagram also shows is that the Scala code is developed within a subdirectory structure named src/main/scala
, under the nbayes
directory. The files called bayes1.scala
and convert.scala
contain the Naïve Bayes code that will be used in the next section:
The bayes.sbt
file is a configuration file used by the sbt tool, which describes how to compile the Scala files within the Scala
directory (also note that if you were developing in Java, you would use a path of the form nbayes/src/main/java
). The contents of the bayes.sbt
file are shown next. The pwd
and cat
Linux commands remind you of the file location, and they also remind you to dump the file contents.
The name, version, and scalaVersion
options set the details of the project, and the version of Scala to be used. The libraryDependencies
options define where the Hadoop and Spark libraries can be located. In this case, CDH5 has been installed using the Cloudera parcels, and the packages libraries can be located in the standard locations, that is, /usr/lib/hadoop
for Hadoop and /usr/lib/spark
for Spark. The resolver's option specifies the location for the Cloudera repository for other dependencies:
[hadoop@hc2nn nbayes]$ pwd /home/hadoop/spark/nbayes [hadoop@hc2nn nbayes]$ cat bayes.sbt name := "Naive Bayes" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.3.0" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0" libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.0.0" // If using CDH, also add Cloudera repo resolvers += "Cloudera Repository" at https://repository.cloudera.com/artifactory/cloudera-repos/
The Scala nbayes project code can be compiled from the nbayes
sub directory using this command:
[hadoop@hc2nn nbayes]$ sbt compile
The sbt compile
command is used to compile the code into classes. The classes are then placed in the nbayes/target/scala-2.10/classes
directory. The compiled classes can be packaged into a JAR file with this command:
[hadoop@hc2nn nbayes]$ sbt package
The sbt package
command will create a JAR file under the directory nbayes/target/scala-2.10
. As the example in the sbt structure diagram shows the JAR file named naive-bayes_2.10-1.0.jar
has been created after a successful compile and package. This JAR file, and the classes that it contains, can then be used in a spark-submit
command. This will be described later as the functionality in the Apache Spark MLlib module is explored.
Finally, when describing the environment used for this book, I wanted to touch on the approach to installing and running Apache Spark. I won't elaborate on the Hadoop CDH5 install, except to say that I installed it using the Cloudera parcels. However, I manually installed version 1.0 of Apache Spark from the Cloudera repository, using the Linux yum
commands. I installed the service-based packages, because I wanted the flexibility that would enable me to install multiple versions of Spark as services from Cloudera, as I needed.
When preparing a CDH Hadoop release, Cloudera takes the code that has been developed by the Apache Spark team, and the code released by the Apache Bigtop project. They perform an integration test so that it is guaranteed to work as a code stack. They also reorganize the code and binaries into services and parcels. This means that libraries, logs, and binaries can be located in defined locations under Linux, that is, /var/log/spark
, /usr/lib/spark
. It also means that, in the case of services, the components can be installed using the Linux yum
command, and managed via the Linux service
command.
Although, in the case of the Neural Network code described later in this chapter, a different approach was used. This is how Apache Spark 1.0 was installed for use with Hadoop CDH5:
[root@hc2nn ~]# cd /etc/yum.repos.d [root@hc2nn yum.repos.d]# cat cloudera-cdh5.repo [cloudera-cdh5] # Packages for Cloudera's Distribution for Hadoop, Version 5, on RedHat or CentOS 6 x86_64 name=Cloudera's Distribution for Hadoop, Version 5 baseurl=http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/5/ gpgkey = http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/RPM-GPG-KEY-cloudera gpgcheck = 1
The first step is to ensure that a Cloudera repository file exists under the /etc/yum.repos.d
directory, on the server hc2nn
and all of the other Hadoop cluster servers. The file is called cloudera-cdh5.repo
, and specifies where the yum command can locate software for the Hadoop CDH5 cluster. On all the Hadoop cluster nodes, I use the Linux yum command, as root, to install the Apache Spark components core, master, worker, history-server, and python:
[root@hc2nn ~]# yum install spark-core spark-master spark-worker spark-history-server spark-python
This gives me the flexibility to configure Spark in any way that I want in the future. Note that I have installed the master component on all the nodes, even though I only plan to use it from the Name Node at this time. Now, the Spark install needs to be configured on all the nodes. The configuration files are stored under /etc/spark/conf
. The first thing to do, will be to set up a slaves
file, which specifies on which hosts Spark will run it's worker components:
[root@hc2nn ~]# cd /etc/spark/conf [root@hc2nn conf]# cat slaves # A Spark Worker will be started on each of the machines listed below. hc2r1m1 hc2r1m2 hc2r1m3 hc2r1m4
As you can see from the contents of the slaves
file above Spark, it will run four workers on the Hadoop CDH5 cluster, Data Nodes, from hc2r1m1
to hc2r1m4
. Next, it will alter the contents of the spark-env.sh
file to specify the Spark environment options. The SPARK_MASTER_IP
values are defined as the full server name:
export STANDALONE_SPARK_MASTER_HOST=hc2nn.semtech-solutions.co.nz export SPARK_MASTER_IP=$STANDALONE_SPARK_MASTER_HOST export SPARK_MASTER_WEBUI_PORT=18080 export SPARK_MASTER_PORT=7077 export SPARK_WORKER_PORT=7078 export SPARK_WORKER_WEBUI_PORT=18081
The web user interface port numbers are specified for the master and worker processes, as well as the operational port numbers. The Spark service can then be started as root from the Name Node server. I use the following script:
echo "hc2r1m1 - start worker" ssh hc2r1m1 'service spark-worker start' echo "hc2r1m2 - start worker" ssh hc2r1m2 'service spark-worker start' echo "hc2r1m3 - start worker" ssh hc2r1m3 'service spark-worker start' echo "hc2r1m4 - start worker" ssh hc2r1m4 'service spark-worker start' echo "hc2nn - start master server" service spark-master start service spark-history-server start
This starts the Spark worker service on all of the slaves, and the master and history server on the Name Node hc2nn
. So now, the Spark user interface can be accessed using the http://hc2nn:18080
URL.
The following figure shows an example of the Spark 1.0 master web user interface. It shows details about the Spark install, the workers, and the applications that are running or completed. The statuses of the master and workers are given. In this case, all are alive. Memory used and availability is given in total and by worker. Although, there are no applications running at the moment, each worker link can be selected to view the executor processes' running on each worker node, as the work volume for each application run is spread across the spark cluster.
Note also the Spark URL, spark://hc2nn.semtech-solutions.co.nz:7077
, will be used when running the Spark applications like spark-shell
and spark-submit
. Using this URL, it is possible to ensure that the shell or application is run against this Spark cluster.
This gives a quick overview of the Apache Spark installation using services, its configuration, how to start it, and how to monitor it. Now, it is time to tackle the first of the MLlib functional areas, which is classification using the Naïve Bayes algorithm. The use of Spark will become clearer as Scala scripts are developed, and the resulting applications are monitored.
3.15.226.120