Chapter 12. Distributed Machine Learning with MLlib

Machine learning describes the construction of algorithms that make predictions from data. It is a core component of most data science pipelines, and is often seen to be the component adding the most value: the accuracy of the machine learning algorithm determines the success of the data science endeavor. It is also, arguably, the section of the data science pipeline that requires the most knowledge from fields beyond software engineering: a machine learning expert will be familiar, not just with algorithms, but also with statistics and with the business domain.

Choosing and tuning a machine learning algorithm to solve a particular problem involves significant exploratory analysis to try and determine which features are relevant, how features are correlated, whether there are outliers in the dataset, and so on. Designing suitable machine learning pipelines is difficult. Add on an additional layer of complexity resulting from the size of datasets and the need for scalability, and you have a real challenge.

MLlib helps mitigate this difficulty. MLlib is a component of Spark that provides machine learning algorithms on top of the core Spark libraries. It offers a set of learning algorithms that parallelize well over distributed datasets.

MLlib has evolved into two separate layers. MLlib itself contains the core algorithms, and ml, also called the pipeline API, defines an API for gluing algorithms together and provides a higher level of abstraction. The two libraries differ in the data types on which they operate: the original MLlib predates the introduction of DataFrames, and acts mainly on RDDs of feature vectors. The pipeline API operates on DataFrames.

In this chapter, we will study the newer pipeline API, diving into MLlib only when the functionality is missing from the pipeline API.

This chapter does not try to teach the machine learning fundamentals behind the algorithms that we present. We assume that the reader has a good enough grasp of machine learning tools and techniques to understand, at least superficially, what the algorithms presented here do, and we defer to better authors for in-depth explanations of the mechanics of statistical learning (we present several references at the end of the chapter).

MLlib is a rich library that is evolving rapidly. This chapter does not aim to give a complete overview of the library. We will work through the construction of a machine learning pipeline to train a spam filter, learning about the parts of MLlib that we need along the way. Having read this chapter, you will have an understanding of how the different parts of the library fit together, and can use the online documentation, or a more specialized book (see references at the end of this chapter) to learn about the parts of MLlib not covered here.

Introducing MLlib – Spam classification

Let's introduce MLlib with a concrete example. We will look at spam classification using the Ling-Spam dataset that we used in the Chapter 10, Distributed Batch Processing with Spark. We will create a spam filter that uses logistic regression to estimate the probability that a given message is spam.

We will run through examples using the Spark shell, but you will find an analogous program in LogisticRegressionDemo.scala among the examples for this chapter. If you have not installed Spark, refer to Chapter 10, Distributed Batch Processing with Spark, for installation instructions.

Let's start by loading the e-mails in the Ling-Spam dataset. If you have not done this for Chapter 10, Distributed Batch Processing with Spark, download the data from data.scala4datascience.com/ling-spam.tar.gz or data.scala4datascience.com/ling-spam.zip, depending on whether you want a tar.gz file or a zip file, and unpack the archive. This will create a spam directory and a ham directory containing spam and ham messages, respectively.

Let's use the wholeTextFiles method to load spam and ham e-mails:

scala> val spamText = sc.wholeTextFiles("spam/*")
spamText: RDD[(String, String)] = spam/...

scala> val hamText = sc.wholeTextFiles("ham/*")
hamText: RDD[(String, String)] = ham/...

The wholeTextFiles method creates a key-value RDD where the keys are the file names and the values are the contents of the files:

scala> spamText.first
(String, String) =
(file:spam/spmsga1.txt,"Subject: great part-time summer job! ...")

scala> spamText.count
Long = 481

The algorithms in the pipeline API work on DataFrames. We must therefore convert our key-value RDDs to DataFrames. We define a new case class, LabelledDocument, which contains a message text and a category label identifying whether a message is spam or ham:

scala> case class LabelledDocument(
  fileName:String, 
  text:String, 
  category:String
)
defined class LabelledDocument

scala> val spamDocuments = spamText.map {
  case (fileName, text) => 
    LabelledDocument(fileName, text, "spam")
}
spamDocuments: RDD[LabelledDocument] = MapPartitionsRDD[2] at map

scala> val hamDocuments = hamText.map {
  case (fileName, text) => 
    LabelledDocument(fileName, text, "ham")
}
hamDocuments: RDD[LabelledDocument] = MapPartitionsRDD[3] at map

To create models, we will need all the documents in a single DataFrame. Let's therefore take the union of our two LabelledDocument RDDs, and transform that to a DataFrame. The union method concatenates RDDs together:

scala> val allDocuments = spamDocuments.union(hamDocuments)
allDocuments: RDD[LabelledDocument] = UnionRDD[4] at union

scala> val documentsDF = allDocuments.toDF
documentsDF: DataFrame = [fileName: string, text: string, category: string]

Let's do some basic checks to verify that we have loaded all the documents. We start by persisting the DataFrame in memory to avoid having to re-create it from the raw text files.

scala> documentsDF.persist
documentsDF.type = [fileName: string, text: string, category: string]

scala> documentsDF.show
+--------------------+--------------------+--------+
|            fileName|                text|category|
+--------------------+--------------------+--------+
|file:/Users/pasca...|Subject: great pa...|    spam|
|file:/Users/pasca...|Subject: auto ins...|    spam|
|file:/Users/pasca...|Subject: want bes...|    spam|
|file:/Users/pasca...|Subject: email 57...|    spam|
|file:/Users/pasca...|Subject: n't miss...|    spam|
|file:/Users/pasca...|Subject: amaze wo...|    spam|
|file:/Users/pasca...|Subject: help loa...|    spam|
|file:/Users/pasca...|Subject: beat irs...|    spam|
|file:/Users/pasca...|Subject: email 57...|    spam|
|file:/Users/pasca...|Subject: best , b...|    spam|
|...                                               |
+--------------------+--------------------+--------+

scala> documentsDF.groupBy("category").agg(count("*")).show
+--------+--------+
|category|COUNT(1)|
+--------+--------+
|    spam|     481|
|     ham|    2412|
+--------+--------+

Let's now split the DataFrame into a training set and a test set. We will use the test set to validate the model that we build. For now, we will just use a single split, training the model on 70% of the data and testing it on the remaining 30%. In the next section, we will look at cross-validation, which provides more rigorous way to check the accuracy of our models.

We can achieve this 70-30 split using the DataFrame's .randomSplit method:

scala> val Array(trainDF, testDF) = documentsDF.randomSplit(
  Array(0.7, 0.3))
trainDF: DataFrame = [fileName: string, text: string, category: string]
testDF: DataFrame = [fileName: string, text: string, category: string]

The .randomSplit method takes an array of weights and returns an array of DataFrames, of approximately the size specified by the weights. For instance, we passed weights 0.7 and 0.3, indicating that any given row has a 70% chance of ending up in trainDF, and a 30% chance of ending up in testDF. Note that this means the split DataFrames are not of fixed size: trainDF is approximately, but not exactly, 70% the size of documentsDF:

scala> trainDF.count / documentsDF.count.toDouble
Double = 0.7013480815762184

If you need a fixed size sample, use the DataFrame's .sample method to obtain trainDF and filter documentDF for rows not in trainDF.

We are now in a position to start using MLlib. Our attempt at classification will involve performing logistic regression on term-frequency vectors: we will count how often each word appears in each message, and use the frequency of occurrence as a feature. Before jumping into the code, let's take a step back and discuss the structure of machine learning pipelines.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.230.81