©  Raju Kumar Mishra 2018
Raju Kumar MishraPySpark Recipeshttps://doi.org/10.1007/978-1-4842-3141-8_9

9. PySpark MLlib and Linear Regression

Raju Kumar Mishra
(1)
Bangalore, Karnataka, India
 
Machine learning has gone through many recent developments and is becoming more popular day by day. People from all domains, including computer science, mathematics, and management, are using machine learning in various projects to find hidden information in data. Big data becomes more interesting when we start applying machine-learning algorithms to it.
PySpark MLlib is a machine-learning library. It is a wrapper over PySpark Core to do data analysis using machine-learning algorithms. It works on distributed systems and is scalable. We can find implementations of classification, clustering, linear regression, and other machine-learning algorithms in PySpark MLlib. We know that PySpark is good for iterative algorithms. Using iterative algorithms, many machine-learning algorithms have been implemented in PySpark MLlib. Apart from PySpark efficiency and scalability, PySpark MLlib APIs are very user-friendly.
Software libraries , which are defined to provide solutions for various problems, come with their own data structures. These data structures are provided to solve a specific set of problems with efficient options. PySpark MLlib comes with many data structures, including dense vectors, sparse vectors, and a local and distributed matrix.
Linear regression is one of the most popular machine-learning algorithms. We create a linear mathematical model between one or more independent variables and one dependent variable.
In this chapter, we will first move through the various data structures defined in PySpark MLlib. Then we will explore linear regression with MLlib. We will also implement linear regression by using the ridge and lasso methods.
This chapter covers the following recipes:
  • Recipe 9-1. Create a dense vector
  • Recipe 9-2. Create a sparse vector
  • Recipe 9-3. Create local matrices
  • Recipe 9-4. Create a RowMatrix
  • Recipe 9-5. Create a labeled point
  • Recipe 9-6. Apply linear regression
  • Recipe 9-7. Apply ridge regression
  • Recipe 9-8. Apply lasso regression

Recipe 9-1. Create a Dense Vector

Problem

You want to create a dense vector .

Solution

DenseVector is a local vector. In PySpark, we can create a dense vector by using the DenseVector constructor.

How It Works

Creating a dense vector requires us to import the DenseVector class, which has been defined inside the pyspark.mllib.linalg submodule:
>>> from pyspark.mllib.linalg import DenseVector
>>> denseDataList = [1.0,3.4,4.5,3.2]
We have created a list of four elements. This list, denseDataList, has floating-point data elements. We have already imported DenseVector, so we are going to create a dense vector now:
>>> denseDataVector = DenseVector(denseDataList)
>>> print denseDataVector
Here is the output:
[1.0,3.4,4.5,3.2]
We have created a dense vector named denseDataVector. We can index our DenseVector elements. The elements of DenseVector are zero-indexed. In the following lines of code, we’ll first fetch the second element of DenseVector, and then we’ll fetch the first element with index 0. Last, we’ll fetch the third element of denseDataVector by using the index 2.
>>> denseDataVector[1]
Here is the output:
3.3999999999999999
>>> denseDataVector[0]
Here is the output:
1.0
>>> denseDataVector[2]
Here is the output:
4.5
Note
We have already discussed NumPy. It is recommended to use a NumPy array over a Python list to create a dense vector, for efficiency. You can read more about DenseVector and SparseVector at https://spark.apache.org/docs/2.0.0-preview/mllib-data-types.html .

Recipe 9-2. Create a Sparse Vector

Problem

You want to create a sparse vector .

Solution

Sometimes we get a dataset in which the maximum value of data elements is 0. Can we escape save zero? Yeah, we can do it. We can save a sparse vector as a SparseVector object in PySpark MLlib.
You have been given a vector. Many elements of this vector have a value of 0, as you can see in Table 9-1. The value at indices 1, 2, 3, 4, 5, and 7 is 0.0.
Table 9-1.
A Sparse Dataset
A430628_1_En_9_Figa_HTML.gif
So six out of eight values are 0. We are going to create a SparseVector out of this.

How It Works

In this section, we are going to create a SparseVector. First, we have to import the SparseVector class from our submodule pyspark.mllib.linalg:
>>> from pyspark.mllib.linalg import SparseVector
>>> sparseDataList = [1.0,3.2]
>>> sparseDataVector = SparseVector(8,[0,7],sparseDataList)
We have created a sparse vector named sparseDataVector. Let me explain the arguments of SparseVector. The first argument, 8, indicates that we are have eight elements in our SparseVector. The second element is a list, a list of indices that includes a nonzero element in our SparseVector. We have 1.0 at index 0, and 3.2 at index 7.
Let’s print our sparseDataVector and see the result:
>>> sparseDataVector
Here is the output:
SparseVector(8, {0: 1.0, 7: 3.2})
You can see that it has our numbers at index 0 and index 7. We can also index our SparseVector by using []:
>>> sparseDataVector[1]
Here is the output:
0.0
>>> sparseDataVector[7]
Here is the output:
3.2000000000000002
So we have found that at index 1, we have 0.0; and at index 7, we have 3.2. The total count of nonzero elements can be fetched by using the function numNonzeros(). In the following code line, we have found that in our SparseVector, we have only two nonzero elements:
>>> sparseDataVector.numNonzeros()
Here is the output:squared_distance( )
2
We can do many operations on a SparseVector. The following code line shows how to calculate the squared distance between two given SparseVectors. We calculate the squared distance by using the squared_distance() function:
>>> sparseDataList1 = [3.0,1.4,2.5,1.2]
>>> sparseDataVector1 = SparseVector(8,[0,3,4,6],sparseDataList1)
>>> squaredDistance = sparseDataVector.squared_distance(sparseDataVector1)
>>> squaredDistance
Here is the output:
23.890000000000001
We have the squared distance.

Recipe 9-3. Create Local Matrices

Problem

You want to create local matrices .

Solution

A local matrix is stored on a single machine, and, obviously, it is local to that machine. The indices of a local matrix are of the integer type. And the values of a local matrix are of the double type. A local matrix comes in two flavors: a dense matrix and a sparse matrix. The elements of a dense matrix are stored in a single array, in column major order. In a sparse matrix, nonzero values are stored in a compressed sparse column format. We can create a dense matrix by using the dense() method defined in the Matrices class. The dense() method is a static method, so it is not necessary to create an object of the Matrices class. Similarly, we can create a sparse local matrix by using the sparse() method defined in the Matrices class. This method is also a static method.

How It Works

First, we will create a local dense matrix. Then we will create a local sparse matrix. Let’s first create a Python list:
>>> denseDataList = [1.0,3.4,4.5,3.2]
>>> ourDenseMatrix = localMtrix.dense(numRows = 2, numCols = 2, values= denseDataList)
We have created a local dense matrix, ourDenseMatrix. We should discuss the arguments of the dense() method. We have provided three arguments to it. The first argument defines the number of rows in our matrix. The second argument is the number of columns. The third argument is a list of matrix elements. Now let’s print the matrix we have created and see how it looks:
>>> ourDenseMatrix
Here is the output:
DenseMatrix(2, 2, [1.0, 3.4, 4.5, 3.2], False)
A new thing appears—False—in the result. What is this? This tells us whether the matrix is transposed.
You might be wondering how to visualize the structure of the dense matrix that we have created. The function to help us is toArray(). This will transform our dense matrix to a NumPy array:
>>> ourDenseMatrix.toArray()
Here is the output:
array([[ 1. ,  4.5],
           [ 3.4,  3.2]])
We can see that our data points have been filled into the dense matrix by column. Let me explain The first two values of denseDataList list are filled in first in the dense matrix, in the first column. Then the remaining two elements are placed in the second column.
We create a sparse matrix with the following line of code:
>>> sparseDataList = [1.0,3.2]
Our sparse matrix has only two nonzero elements. We want to create a matrix of 2 × 2. Therefore, it is obvious that our matrix also will have two 0 elements. We know that PySpark saves Spark local matrices in a compressed sparse column format. Therefore, we have to provide column pointers as an argument. We want to create a diagonal matrix using the numbers 1.0 and 3.2:
>>> ourSparseMatrix = localMtrix.sparse(numRows = 2,  numCols = 2, colPtrs = [0,1,2],  rowIndices = [0,1], values = sparseDataList)
>>> ourSparseMatrix.toArray()
Here is the output:
array([[ 1. ,  0. ],
       [ 0. ,  3.2]])
Note
You can read more about compressed sparse column format on Wikipedia, https://en.wikipedia.org/wiki/Sparse_matrix .

Recipe 9-4. Create a Row Matrix

Problem

You want to create a row matrix .

Solution

RowMatrix is a distributed matrix. The row indices of RowMatrix are meaningless. It is a row-oriented distributed matrix.

How It Works

The class RowMatrix is in the PySpark submodule pyspark.mllib.linalg.distributed. Let’s first import this class. Then we will create a RowMatrix.
>>> from pyspark.mllib.linalg.distributed import RowMatrix as rm
To create a RowMatrix, we first need an RDD of vectors. But even if you have an RDD of lists, that will work. We have our data in a nested list, dataList:
>>> dataList  = [[ 94.88,  82.04,  52.57],
...                         [ 35.85,  26.9 ,   3.63],
...                         [ 41.76,  69.67,  50.62],
...                         [ 90.45,  54.66,  64.07]]
The nested list dataList has four rows and three columns. Now we have to create an RDD out of this dataList:
>>> dataListRDD = sc.parallelize(dataList,4)
>>> ourRowMatrix = rm(rows = dataListRDD, numRows = 4 , numCols = 3)
We have created our RowMatrix. The first argument of RowMatrix is the RDD of the list. The second and third arguments are the number of rows and the number of columns, respectively.
>>> ourRowMatrix.numRows()
Here is the output:
4L
>>> ourRowMatrix.numCols()
Here is the output:
3L

Recipe 9-5. Create a Labeled Point

Problem

You want to create a labeled point .

Solution

A labeled point is a basic data structure for linear regression and classification algorithms. It consists of a sparse or dense vector associated with a label. Labels are of the double data type; therefore, a labeled point can be used in both regression and classification. The LabeledPoint class stays inside the pyspark.mllib.regression submodule. Let’s create LabeledPoint data.

How It Works

First we define a nested list:
>>> from pyspark.mllib.regression import LabeledPoint
>>> labeledPointData = [[3.09,1.97,3.73,1],
...                     [2.96,2.15,4.16,1],
...                     [2.87,1.93,4.39,1],
...                     [3.02,1.55,4.43,1],
...                     [1.8,3.65,2.08,2],
...                     [1.36,4.43,1.95,2],
...                     [1.71,4.35,1.94,2],
...                     [1.03,3.75,2.12,2],
...                     [2.3,3.59,1.99,2]]
The last data point in every list of our nested list labeledPointData is our label. Let’s parallelize the data:
>>> labeledPointDataRDD = sc.parallelize(labeledPointData, 4)
>>> labeledPointDataRDD.take(4)
Here is the output:
[[3.09, 1.97, 3.73, 1],
 [2.96, 2.15, 4.16, 1],
 [2.87, 1.93, 4.39, 1],
 [3.02, 1.55, 4.43, 1]]
>>> labeledPointRDD = labeledPointDataRDD.map(lambda data : LabeledPoint(data[3],data[0:3]))
>>> labeledPointRDD.take(4)
Here is the output:
[LabeledPoint(1.0, [3.09,1.97,3.73]),
 LabeledPoint(1.0, [2.96,2.15,4.16]),
 LabeledPoint(1.0, [2.87,1.93,4.39]),
 LabeledPoint(1.0, [3.02,1.55,4.43])]
We have created our labeledPointRDD. You can see that the labeled points have been transformed into double data types.

Recipe 9-6. Apply Linear Regression

Problem

You want to apply linear regression .

Solution

Linear regression is a supervised machine-learning algorithm . Here we fit a line (a straight line or a curved line) that generates a linear relationship between dependent and independent variables.
We have been given a file, linearRegressionData.csv. This file consists of four columns. If we visualize the file data, it looks as follows:
+-----+----+----+----+
| dvs1|ivs1|ivs2|ivs3|
+-----+----+----+----+
|34.63|5.53|5.58|5.41|
|40.89|3.89|6.48|6.97|
|37.25|5.07| 4.5| 6.5|
|45.09|5.81|5.71|8.59|
| 39.4|5.61|5.79|6.77|
+-----+----+----+----+.
The column dvs1 is the dependent variable, which depends on the three independent variables ivs1, ivs2, and ivs3. We have to create a mathematical model that will show a linear relationship between the dependent and independent variables. The linear regression model’s mathematical formula is depicted in Figure 9-1.
A430628_1_En_9_Fig1_HTML.jpg
Figure 9-1.
Mathematical formula for the linear regression model
We have to estimate the value of the intercept and weights.

How It Works

Let’s create the model step-by-step.

Step 9-6-1. Reading CSV File Data

Our regression data is in linearRegressionData.csv. We have to read this data and transform it into labeled points in order to run linear regression analysis on it. I think that a better strategy is to transform our data to an RDD of labeled points that can be accomplished by first reading the file by using the spark.read.csv() function. We know that the spark.read.csv() function will return a DataFrame. We have to transform that DataFrame to an RDD of labeled points somehow. So first let’s read the file.
>>> regressionDataFrame = spark.read.csv('file:///home/pysparkbook/bData/linearRegressionData.csv',header=True, inferSchema = True)
>>> regressionDataFrame.show(5)
Here is the output, showing only the top five rows:
+-----+----+----+----+
| dvs1|ivs1|ivs2|ivs3|
+-----+----+----+----+
|34.63|5.53|5.58|5.41|
|40.89|3.89|6.48|6.97|
|37.25|5.07| 4.5| 6.5|
|45.09|5.81|5.71|8.59|
| 39.4|5.61|5.79|6.77|
+-----+----+----+----+.
After reading the file, we have the DataFrame. The following line of code transforms our DataFrame to an RDD:
>>> regressionDataRDDDict = regressionDataFrame.rdd
>>> regressionDataRDDDict.take(5)
Here is the output:
[Row(dvs1=34.63, ivs1=5.53, ivs2=5.58, ivs3=5.41),
 Row(dvs1=40.89, ivs1=3.89, ivs2=6.48, ivs3=6.97),
 Row(dvs1=37.25, ivs1=5.07, ivs2=4.5, ivs3=6.5),
 Row(dvs1=45.09, ivs1=5.81, ivs2=5.71, ivs3=8.59),
 Row(dvs1=39.4, ivs1=5.61, ivs2=5.79, ivs3=6.77)]
We have seen the output of regressionDataRDDDict after transformation into an RDD. This is an RDD of row objects. You might be thinking that you know about row objects. We use these while creating a DataFrame. But this is not our requirement. You can see that the row objects have data in key/value format. So we need more transformations to get only the values in our RDD:
>>> regressionDataRDD = regressionDataFrame.rdd.map(list)
>>> regressionDataRDD.take(5)
Here is the output:
[[34.63, 5.53, 5.58, 5.41],
 [40.89, 3.89, 6.48, 6.97],
 [37.25, 5.07, 4.5, 6.5],
 [45.09, 5.81, 5.71, 8.59],
 [39.4, 5.61, 5.79, 6.77]]
Adding a list as an argument of the RDD map() function has transformed our data into a format that we can use to easily create our labeled point RDD. In the following step, we are going to create a LabeledPoint RDD.

Step 9-6-2. Creating an RDD of the Labeled Point

In order to run linear regression, we have transformed our data into labeled points. As we discussed, the first column of our RDD is a dependent variable, which depends on the rest of the variables. Therefore, the first value of every RDD element is our label, and the rest are our features. Now we can create the LabeledPoint RDD. We know that to use LabeledPoint, we have to import the class:
>>> from pyspark.mllib.regression import LabeledPoint
>>> regressionDataLabelPoint = regressionDataRDD.map(lambda data : LabeledPoint(data[0],data[1:4]))
The map() function can be used to transform our RDD to a LabeledPoint RDD. The first argument of LabeledPoint is the label, which we provide as data[0] in the preceding code. We are going to take five elements out of the LabeledPoint RDD via regressionDataLabelPoint. Surely, we are going to use the take() function with 5 as an argument to it:
>>> regressionDataLabelPoint.take(5)
Here is the output:
[LabeledPoint(34.63, [5.53,5.58,5.41]),
 LabeledPoint(40.89, [3.89,6.48,6.97]),
 LabeledPoint(37.25, [5.07,4.5,6.5]),
 LabeledPoint(45.09, [5.81,5.71,8.59]),
 LabeledPoint(39.4, [5.61,5.79,6.77])]
We have created the required LabeledPoint RDD. As we discussed, linear regression is a supervised machine-learning algorithm. Therefore, we first divide our given datasets into training and testing datasets. The training dataset will be used to create the model, and then we’ll apply the testing data to check the accuracy of the linear regression model we have created. In the following step, we will divide our dataset into training and testing datasets.

Step 9-6-3. Dividing Training and Testing Data

PySpark provides the randomSplit() function , which we can use to divide our datasets into training and testing datasets:
>>> regressionLabelPointSplit = regressionDataLabelPoint.randomSplit([0.7,0.3])
We providing the list [0.7 , 0.3] as an argument. This list indicates that we need 70 percent of our data points in our training dataset, and the rest in our testing dataset. In our dataset, we have a total of 30 records. Therefore, 22 records will go in the training dataset, and 8 will go in the testing dataset.
>>> regressionLabelPointTrainData = regressionLabelPointSplit[0]
>>> regressionLabelPointTrainData.take(5)
Here is the output:
[LabeledPoint(34.63, [5.53,5.58,5.41]),
 LabeledPoint(45.09, [5.81,5.71,8.59]),
 LabeledPoint(39.4, [5.61,5.79,6.77]),
 LabeledPoint(33.25, [5.33,5.78,4.94]),
 LabeledPoint(44.17, [6.11,6.18,8.2])]
The training dataset is ready:
>>> regressionLabelPointTrainData.count()
Here is the output:
22
As we discussed, there are 22 data points are in our training dataset:
>>> regressionLabelPointTestData = regressionLabelPointSplit[1]
>>> regressionLabelPointTestData.take(5)
Here is the output:
[LabeledPoint(40.89, [3.89,6.48,6.97]),
 LabeledPoint(37.25, [5.07,4.5,6.5]),
 LabeledPoint(42.92, [5.39,6.59,7.56]),
 LabeledPoint(38.23, [4.19,6.47,6.52]),
 LabeledPoint(36.33, [5.65,5.78,5.47])]
>>> regressionLabelPointTestData.count()
Here is the output:
8
The testing dataset has been created as regressionLabelPointTestData. The count() function on the test data ensures that we have eight data points in our testing dataset.
Now that we have the training and testing datasets, we are ready to fit the regression model to our training dataset. In the following step, we are going to create our linear regression model.

Step 9-6-4. Creating a Linear Regression Model

PySpark uses stochastic gradient descent (SGD) to calculate the coefficients of the linear regression model. This is an optimization algorithm. The PySpark class LinearRegressionWithSGD is used to do the operation related to linear regression.
Let’s import LinearRegressionWithSGD; we’ll import it as lrSGD:
>>> from pyspark.mllib.regression import LinearRegressionWithSGD as lrSGD
>>> ourModelWithLinearRegression  = lrSGD.train(
                                    data = regressionLabelPointTrainData,
...                                             iterations = 200,
...                                             step = 0.02,
...                                             intercept = True)
There is a static method, train(), defined in the LinearRegressionWithSGD class. The train() method is used to create a linear regression model. The first argument of the train() method is the data that is our training data. SGD is an iterative algorithm, so we provide the number of iterations as the second argument to our train() method. The third parameter, step, defines the size of the movement in the SGD algorithm. If a linear model has an intercept, we have to set that intercept to True. Finally, this creates our linear regression model named ourModelWithLinearRegression.
>>> ourModelWithLinearRegression.intercept
Here is the output:
1.3475409224629387
>>> ourModelWithLinearRegression.weights
Here is the output:
DenseVector([1.7083, 2.1529, 2.3226])
We can create our regression model by using the intercept and weights. The created model is shown in in Figure 9-2.
A430628_1_En_9_Fig2_HTML.jpg
Figure 9-2.
Our regression model
Note
You can read more about stochastic gradient descent on Wikipedia, https://en.wikipedia.org/wiki/Stochastic_gradient_descent .

Step 9-6-5. Saving the Created Model

Sometimes we have to save the created model and use it in the future. We can save our model by using the save() method :
>>> ourModelWithLinearRegression.save(sc, '/home/pysparkbook/ourModelWithLinearRegression')
The first argument of the save() method is SparkContext. The second argument is the path of the directory where you want to save your model. We have saved our model, but how do we read it? Reading the saved model requires the load() method. This method is inside the LinearRegressionModel class.
>>> from pyspark.mllib.regression import LinearRegressionModel as linearRegressModel
>>> ourModelWithLinearRegressionReloaded = linearRegressModel.load(sc, '/home/pysparkbook/ourModelWithLinearRegression')
The saved model has been reloaded, so now we can use it. Since we have reloaded the model, let’s check whether we can get the intercept and weight values from the reloaded model:
>>> ourModelWithLinearRegressionReloaded.intercept
Here is the output:
1.34754092246
>>> ourModelWithLinearRegressionReloaded.weights
Here is the output:
DenseVector([1.7083, 2.1529, 2.3226])
It is clear that we have reloaded the model.

Step 9-6-6. Predicting Data by Using the Model

Whenever any mathematical model is created, we can check the credibility of the model. To check our model’s credibility, we will make an RDD of the actual and predicted data of our test data:
>>> actualDataandLinearRegressionPredictedData = regressionLabelPointTestData.map(lambda data : (float(data.label) , float(ourModelWithLinearRegression.predict(data.features))))
The predict() method will take the features data (which are independent variables) and return the predicted values for the dependent variable. Now we’ll have an RDD of the actual and predicted data.
>>> actualDataandLinearRegressionPredictedData.take(5)
Here is the output:
[(40.89, 38.1322613341641),
 (37.25, 34.79375295252528),
 (42.92, 42.30191258048799),
 (38.23, 37.57804867136557),
 (36.33, 36.14796052442989)]

Step 9-6-7. Evaluating the Model We Created

After getting the RDD of actual and predicted data, we will calculate evaluation metrics. We will first calculate the root-mean-square error. The mathematical formula for a root-mean-square error is given in Figure 9-3.
A430628_1_En_9_Fig3_HTML.jpg
Figure 9-3.
Mathematical formula for a root-mean-square error
dvs1 is the actual value of the dependent variable and dvs1pred is the predicted value of the dependent variable. We have n data points.
We have to import the RegressionMetrics class to evaluate the model:
>>> from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs
>>> ourLinearRegressionModelMetrics = rmtrcs(actualDataandLinearRegressionPredictedData)
>>> ourLinearRegressionModelMetrics.rootMeanSquaredError
Here is the output:
1.8446573587605941
The value of our root-mean-square error is 1.844657. Similarly, we calculate the value of R2:
>>> ourLinearRegressionModelMetrics.r2
Here is the output:
0.47423120771913974
The value of R2 is less, even less than 0.5. So is this a good model? I say no. So another question is, can we improve the efficiency of our model? And the answer is yes; I have done it just by playing with the learning step size and number of iterations:
>>> ourModelWithLinearRegression  = lrSGD.train(data = regressionLabelPointTrainData,
...                                             iterations = 100,
...                                             step = 0.05,
...                                             intercept = True)
>>> actualDataandLinearRegressionPredictedData = regressionLabelPointTestData.map(lambda data : (float(data.label) , float(ourModelWithLinearRegression.predict(data.features))))
>>> from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs
>>> ourLinearRegressionModelMetrics = rmtrcs(actualDataandLinearRegressionPredictedData)
>>> ourLinearRegressionModelMetrics.rootMeanSquaredError
Here is the output:
1.7856232547826518
>>> ourLinearRegressionModelMetrics.r2
Here is the output:
0.6377723547885376
So, finally, we have increased the value of R2.

Recipe 9-7. Apply Ridge Regression

Problem

You want to apply ridge regression .

Solution

You have been given a dataset in the CSV file autoMPGDataModified.csv. This dataset has five columns. We have to fit a linear regression model to this data by using ridge regularization. The first column is miles per gallon, which is the dependent variable in this case.
+----+------------+----------+------+------------+
| mpg|displacement|horsepower|weight|acceleration|
+----+------------+----------+------+------------+
|18.0|       307.0|        18|  3504|        12.0|
|15.0|       350.0|        36|  3693|        11.5|
|18.0|       318.0|        30|  3436|        11.0|
|16.0|       304.0|        30|  3433|        12.0|
|17.0|       302.0|        25|  3449|        10.5|
+----+------------+----------+------+------------+
I have taken this dataset from the UCI Machine Learning Repository ( https://archive.ics.uci.edu/ml/datasets/auto+mpg ) and removed some columns. According to the web page, the dataset was taken from the StatLib library maintained at Carnegie Mellon University and was used in the 1983 American Statistical Association Exposition.
You might be wondering about the difference between linear regression and linear regression with the ridge parameter. We know that we do optimization of error part using SGD. So in the error part, an extra term is added, as shown in Figure 9-4.
A430628_1_En_9_Fig4_HTML.jpg
Figure 9-4.
Extra error term in ridge regression
Let’s perform ridge regression on the given dataset.

How It Works

Step 9-7-1. Reading the CSV File Data

We have to read the data and transform it to RDD, as we have done in previous recipes:
>>> autoDataFrame = spark.read.csv('file:///home/pysparkbook/bData/autoMPGDataModified.csv',header=True, inferSchema = True)
>>> autoDataFrame.show(5)
Here is the output, showing only the top five rows:
+----+------------+----------+------+------------+
| mpg|displacement|horsepower|weight|acceleration|
+----+------------+----------+------+------------+
|18.0|       307.0|        18|  3504|        12.0|
|15.0|       350.0|        36|  3693|        11.5|
|18.0|       318.0|        30|  3436|        11.0|
|16.0|       304.0|        30|  3433|        12.0|
|17.0|       302.0|        25|  3449|        10.5|
+----+------------+----------+------+------------+
>>> autoDataFrame.printSchema()
Here is the output:
root
 |-- mpg: double (nullable = true)
 |-- displacement: double (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- acceleration: double (nullable = true)
>>> autoDataRDDDict = autoDataFrame.rdd
>>> autoDataRDDDict.take(5)
Here is the output:
[Row(mpg=18.0, displacement=307.0, horsepower=18, weight=3504, acceleration=12.0),
Row(mpg=15.0, displacement=350.0, horsepower=36, weight=3693, acceleration=11.5),
Row(mpg=18.0, displacement=318.0, horsepower=30, weight=3436, acceleration=11.0),
Row(mpg=16.0, displacement=304.0, horsepower=30, weight=3433, acceleration=12.0),
Row(mpg=17.0, displacement=302.0, horsepower=25, weight=3449, acceleration=10.5)]
We transform our DataFrame to an RDD so that we can transform it further into the LabeledPoint RDD:
>>> autoDataRDD = autoDataFrame.rdd.map(list)
>>> autoDataRDD.take(5)
Here is the output:
[[18.0, 307.0, 18, 3504, 12.0],
 [15.0, 350.0, 36, 3693, 11.5],
 [18.0, 318.0, 30, 3436, 11.0],
 [16.0, 304.0, 30, 3433, 12.0],
 [17.0, 302.0, 25, 3449, 10.5]]

Step 9-7-2. Creating an RDD of the Labeled Points

After getting the RDD, we have to transform the RDD to the LabeledPoint RDD :
>>> from pyspark.mllib.regression import LabeledPoint
>>> autoDataLabelPoint = autoDataRDD.map(lambda data : LabeledPoint(data[0],[data[1]/10,data[2],float(data[3])/100,data[4]]))
In the dataset, we can see that it is better to normalize the data. Therefore, we divide the displacement by 10 and the weight by 100:
>>> autoDataLabelPoint.take(5)
Here is the output:
[LabeledPoint(18.0, [30.7,18.0,35.04,12.0]),
 LabeledPoint(15.0, [35.0,36.0,36.93,11.5]),
 LabeledPoint(18.0, [31.8,30.0,34.36,11.0]),
 LabeledPoint(16.0, [30.4,30.0,34.33,12.0]),
 LabeledPoint(17.0, [30.2,25.0,34.49,10.5])]

Step 9-7-3. Dividing Training and Testing Data

It is time to divide our dataset into training and testing datasets:
>>> autoDataLabelPointSplit = autoDataLabelPoint.randomSplit([0.7,0.3])
>>> autoDataLabelPointTrain = autoDataLabelPointSplit[0]
>>> autoDataLabelPointTest = autoDataLabelPointSplit[1]
>>> autoDataLabelPointTrain.take(5)
Here is the output:
[LabeledPoint(18.0, [30.7,18.0,35.04,12.0]),
 LabeledPoint(15.0, [35.0,36.0,36.93,11.5]),
 LabeledPoint(18.0, [31.8,30.0,34.36,11.0]),
 LabeledPoint(16.0, [30.4,30.0,34.33,12.0]),
 LabeledPoint(17.0, [30.2,25.0,34.49,10.5])]
>>> autoDataLabelPointTest.take(5)
Here is the output:
[LabeledPoint(14.0, [45.5,48.0,44.25,10.0]),
 LabeledPoint(15.0, [39.0,41.0,38.5,8.5]),
 LabeledPoint(15.0, [40.0,30.0,37.61,9.5]),
 LabeledPoint(24.0, [11.3,92.0,23.72,15.0]),
 LabeledPoint(26.0, [9.7,51.0,18.35,20.5])]
>>> autoDataLabelPointTest.count()
Here is the output:
122
>>> autoDataLabelPointTrain.count()
Here is the output:
269

Step 9-7-4. Creating a Linear Regression Model

We can create our model by using the train() method of the RidgeRegressionWithSGD class. Therefore, we first have to import the RidgeRegressionWithSGD class and then run the train() method:
>>> from pyspark.mllib.regression import RidgeRegressionWithSGD  as ridgeSGD
>>> ourModelWithRidge  = ridgeSGD.train(
data = autoDataLabelPointTrain,
iterations = 400,
step = 0.0005,
regParam = 0.05,
intercept = True
)
In our train() method, there is one more argument, regParam, than in our previous recipe. The regParam argument is a regularization parameter, alpha, shown previously in Figure 9-4.
>>> ourModelWithRidge.intercept
Here is the output:
1.0192595005891258
>>> ourModelWithRidge.weights
Here is the output:
DenseVector([-0.0575, 0.2025, 0.1961, 0.3503])
We have created our model and have the intercept and coefficients.

Step 9-7-5. Saving the Created Model

We can save our model and reload it as we did in the previous recipe. The following code first saves the model and then reloads it. After reloading the model, we will check whether it is working correctly.
>>> ourModelWithRidge.save(sc, '/home/pysparkbook/ourModelWithRidge')
>>> from  pyspark.mllib.regression import RidgeRegressionModel as ridgeRegModel
>>> ourModelWithRidgeReloaded = ridgeRegModel.load(sc, '/home/pysparkbook/ourModelWithRidge')
>>> ourModelWithRidgeReloaded.intercept
Here is the output:
1.01925950059
>>> ourModelWithRidgeReloaded.weights
Here is the output:
DenseVector([-0.0575, 0.2025, 0.1961, 0.3503])
Our saved model is working correctly.

Step 9-7-6. Predicting the Data by Using the Model

In this step, we are going to create an RDD of actual and predicted data. The predicted data will be calculated by using the predict() function .
>>> actualDataandRidgePredictedData = autoDataLabelPointTest.map(lambda data : [float(data.label) , float(ourModelWithRidge.predict(data.features))])
>>> actualDataandRidgePredictedData.take(5)
Here is the output:
[[18.0, 15.857286660271024],
 [16.0, 16.28216643081738],
 [17.0, 14.787196092732607],
 [15.0, 17.60672713589945],
 [14.0, 17.67800889949583]]

Step 9-7-7. Evaluating the Model We Have Created

We have to again find the root-mean-square error:
>>> ourRidgeModelMetrics = rmtrcs(actualDataandRidgePredictedData)
>>> ourRidgeModelMetrics.rootMeanSquaredError
Here is the output:
8.149263319131556
This is the error value. The higher the value, the less accurate the model is. We have calculated the root-mean-square error, and we have checked the credibility of the model.

Recipe 9-8. Apply Lasso Regression

Problem

You want to apply lasso regression .

Solution

Linear regression with lasso regularization is used for models that are not properly fitted. In the case of lasso, at the time of error minimization, we add the term shown in Figure 9-5.
A430628_1_En_9_Fig5_HTML.jpg
Figure 9-5.
Extra error term in lasso regression

How It Works

Step 9-8-1. Creating a Linear Regression Model with Lasso

We have already created LabeledPoint, containing auto data. We can apply the train() method defined in the LassoWithSGD class:
>>> from pyspark.mllib.regression import LassoWithSGD  as lassoSGD
>>> ourModelWithLasso  = lassoSGD.train(data = autoDataLabelPointTrain, iterations = 400, step = 0.0005,regParam = 0.05, intercept = True)
We have created our model.
>>> ourModelWithLasso.intercept
Here is the output:
1.020329086499831
>>> ourModelWithLasso.weights
Here is the output:
DenseVector([-0.063, 0.2046, 0.198, 0.3719])
We have the intercept and weight of the model.

Step 9-8-2. Predicting the Data Using the Lasso Model

In order to get the RDD of actual and predicted data, we are going to use the same strategy used in previous recipes:
>>> actualDataandLassoPredictedData = autoDataLabelPointTest.map(lambda data : (float(data.label) , float(ourModelWithLasso.predict(data.features))))
>>> actualDataandLassoPredictedData.take(5)
Here is the output:
[(15.0, 17.768596038896607),
 (16.0, 16.5021818747879),
 (17.0, 14.965800201626084),
 (15.0, 17.734571412337576),
 (15.0, 17.154509770352835)]

Step 9-8-3. Evaluating the Model We Have Created

Now we have to test the model—and though there’s no need to say it, we are going to use the same strategy as before:
>>> from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs
>>> ourLassoModelMetrics = rmtrcs(actualDataandLassoPredictedData)
>>> ourLassoModelMetrics.rootMeanSquaredError
Here is the output:
7.030519540791776
We have found the root-mean-square error.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.15.154