An example of clustering using K-means of Spark MLlib

To further demonstrate the clustering example, we will use the Saratoga NY Homes dataset downloaded from http://course1.winona.edu/bdeppa/Stat%20425/Datasets.html as an unsupervised learning technique using Spark MLlib. The dataset contains several features of houses located in the suburb of the New York City. For example, price, lot size, waterfront, age, land value, new construct, central air, fuel type, heat type, sewer type, living area, pct.college, bedrooms, fireplaces, bathrooms, and the number of rooms. However, only a few features have been shown in the following table:

Price Lot Size Water Front Age Land Value Rooms
132,500 0.09 0 42 5,000 5
181,115 0.92 0 0 22,300 6
109,000 0.19 0 133 7,300 8
155,000 0.41 0 13 18,700 5
86,060 0.11 0 0 15,000 3
120,000 0.68 0 31 14,000 8
153,000 0.4 0 33 23,300 8
170,000 1.21 0 23 146,000 9
90,000 0.83 0 36 222,000 8
122,900 1.94 0 4 212,000 6
325,000 2.29 0 123 126,000 12
Table 1: Sample data from the Saratoga NY Homes dataset

The target of this clustering technique here is to show an exploratory analysis based on the features of each house in the city for finding possible neighborhoods for the house located in the same area. Before performing feature extraction, we need to load and parse the Saratoga NY Homes dataset. This step also includes loading packages and related dependencies, reading the dataset as RDD, model training, prediction, collecting the local parsed data, and clustering comparing.

Step 1. Import-related packages:

package com.chapter13.Clustering
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext

Step 2. Create a Spark session - the entry point - Here we at first set the Spark configuration by setting the application name and master URL. For simplicity, it's standalone with all the cores on your machine:

val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName("KMeans")
.getOrCreate()

Step 3. Load and parse the dataset - Read, parse, and create RDDs from the dataset as follows:

//Start parsing the dataset
val start = System.currentTimeMillis()
val dataPath = "data/Saratoga NY Homes.txt"
//val dataPath = args(0)
val landDF = parseRDD(spark.sparkContext.textFile(dataPath))
.map(parseLand).toDF().cache()
landDF.show()

Note that, to make the preceding code work, you should import the following package:

import spark.sqlContext.implicits._

You will get the following output:

Figure 3: A snapshot of the Saratoga NY Homes dataset

The following is the parseLand method that is used to create a Land class from an array of Double as follows:

// function to create a  Land class from an Array of Double
def parseLand(line: Array[Double]): Land = {
Land(line(0), line(1), line(2), line(3), line(4), line(5),
line(6), line(7), line(8), line(9), line(10),
line(11), line(12), line(13), line(14), line(15)
)
}

And the Land class that reads all the features as a double is as follows:

case class Land(
Price: Double, LotSize: Double, Waterfront: Double, Age: Double,
LandValue: Double, NewConstruct: Double, CentralAir: Double,
FuelType: Double, HeatType: Double, SewerType: Double,
LivingArea: Double, PctCollege: Double, Bedrooms: Double,
Fireplaces: Double, Bathrooms: Double, rooms: Double
)

As you already know, to train the K-means model, we need to ensure all the data points and features to be numeric. Therefore, we further need to convert all the data points to double as follows:

// method to transform an RDD of Strings into an RDD of Double
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).map(_.map(_.toDouble))
}

Step 4. Preparing the training set - At first, we need to convert the data frame (that is, landDF) to an RDD of doubles and cache the data to create a new data frame to link the cluster numbers as follows:

val rowsRDD = landDF.rdd.map(r => (
r.getDouble(0), r.getDouble(1), r.getDouble(2),
r.getDouble(3), r.getDouble(4), r.getDouble(5),
r.getDouble(6), r.getDouble(7), r.getDouble(8),
r.getDouble(9), r.getDouble(10), r.getDouble(11),
r.getDouble(12), r.getDouble(13), r.getDouble(14),
r.getDouble(15))
)
rowsRDD.cache()

Now that we need to convert the preceding RDD of doubles into an RDD of dense vectors as follows:

// Get the prediction from the model with the ID so we can
link them back to other information
val predictions = rowsRDD.map{r => (
r._1, model.predict(Vectors.dense(
r._2, r._3, r._4, r._5, r._6, r._7, r._8, r._9,
r._10, r._11, r._12, r._13, r._14, r._15, r._16
)
))}

Step 5. Train the K-means model - Train the model by specifying 10 clusters, 20 iterations, and 10 runs as follows:

val numClusters = 5
val numIterations = 20
val run = 10
val model = KMeans.train(numericHome, numClusters,numIterations, run,
KMeans.K_MEANS_PARALLEL)
The Spark-based implementation of K-means starts working by initializing a set of cluster centers using the K-means algorithm by Bahmani et al., Scalable K-Means++, VLDB 2012. This is a variant of K-means++ that tries to find dissimilar cluster centers by starting with a random center and then doing passes where more centers are chosen with a probability proportional to their squared distance to the current cluster set. It results in a provable approximation to an optimal clustering. The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.

 

Step 6. Evaluate the model error rate - The standard K-means algorithm aims at minimizing the sum of squares of the distance between the points of each set, that is, the squared Euclidean distance, which is the WSSSE's objective. The K-means algorithm aims at minimizing the sum of squares of the distance between the points of each set (that is, the cluster center). However, if you really wanted to minimize the sum of squares of the distance between the points of each set, you would end up with a model where each cluster is its own cluster center; in that case, that measure would be 0.

Therefore, once you have trained your model by specifying the parameters, you can evaluate the result by using Within Set Sum of Squared Errors (WSSE). Technically, it is something like the sum of the distances of each observation in each K cluster that can be computed as follows:

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WCSSS = model.computeCost(landRDD)
println("Within-Cluster Sum of Squares = " + WCSSS)

The preceding model training set produces the value of WCSSS:

Within-Cluster Sum of Squares = 1.455560123603583E12 

Step 7. Compute and print the cluster centers - At first, we get the prediction from the model with the ID so that we can link them back to other information related to each house. Note that we will use an RDD of rows that we prepared in step 4:

// Get the prediction from the model with the ID so we can link them
back to other information
val predictions = rowsRDD.map{r => (
r._1, model.predict(Vectors.dense(
r._2, r._3, r._4, r._5, r._6, r._7, r._8, r._9, r._10,
r._11, r._12, r._13, r._14, r._15, r._16
)
))}

However, it should be provided when a prediction is requested about the price. This should be done as follows:

val predictions = rowsRDD.map{r => (
r._1, model.predict(Vectors.dense(
r._1, r._2, r._3, r._4, r._5, r._6, r._7, r._8, r._9, r._10,
r._11, r._12, r._13, r._14, r._15, r._16
)
))}

For better visibility and an exploratory analysis, convert the RDD to a DataFrame as follows:

import spark.sqlContext.implicits._
val predCluster = predictions.toDF("Price", "CLUSTER")
predCluster.show()

This should produce the output shown in the following figure:

Figure 4: A snapshot of the clusters predicted

Since there's no distinguishable ID in the dataset, we represented the Price field to make the linking. From the preceding figure, you can understand where does a house having a certain price falls, that is, in which cluster. Now for better visibility, let's join the prediction DataFrame with the original DataFrame to know the individual cluster number for each house:

val newDF = landDF.join(predCluster, "Price") 
newDF.show()

You should observe the output in the following figure:

Figure 5: A snapshot of the clusters predicted across each house

To make the analysis, we dumped the output in RStudio and generated the clusters shown in Figure 6. The R script can be found on my GitHub repositories at https://github.com/rezacsedu/ScalaAndSparkForBigDataAnalytics. Alternatively, you can write your own script and do the visualization accordingly.

Figure 6: Clusters of the neighborhoods

Now, for more extensive analysis and visibility, we can observe related statistics for each cluster. For example, below I printed thestatistics related to cluster 3 and 4 in Figure 8 and Figure 9, respectively:

newDF.filter("CLUSTER = 0").show() 
newDF.filter("CLUSTER = 1").show()
newDF.filter("CLUSTER = 2").show()
newDF.filter("CLUSTER = 3").show()
newDF.filter("CLUSTER = 4").show()

Now get the descriptive statistics for each cluster as follows:

newDF.filter("CLUSTER = 0").describe().show()
newDF.filter("CLUSTER = 1").describe().show()
newDF.filter("CLUSTER = 2").describe().show()
newDF.filter("CLUSTER = 3").describe().show()
newDF.filter("CLUSTER = 4").describe().show()

At first, let's observe the related statistics of cluster 3 in the following figure:

Figure 7: Statistics on cluster 3

Now let's observe the related statistics of cluster 4 in the following figure:

Figure 8: Statistics on cluster 4

Note that, since the original screenshot was too large to fit in this page, the original images were modified and the column containing other variables of the houses were removed.

Due to the random nature of this algorithm, you might receive different results for each successful iteration. However, you can lock the random nature of this algorithm by setting the seed as follows:

val numClusters = 5 
val numIterations = 20
val seed = 12345
val model = KMeans.train(landRDD, numClusters, numIterations, seed)

Step 8. Stop the Spark session - Finally, stop the Spark session using the stop method as follows:

spark.stop()

In the preceding example, we dealt with a very small set of features; common-sense and visual inspection would also lead us to the same conclusions. From the above example using the K-means algorithm, we can understand that there are some limitations for this algorithm. For example, it's really difficult to predict the K-value, and with a global cluster it does not work well. Moreover, different initial partitions can result in different final clusters, and, finally, it does not work well with clusters of different sizes and densities.

To overcome these limitations, we have some more robust algorithms in this book like MCMC (Markov Chain Monte Carlo; see also at https://en.wikipedia.org/wiki/Markov_chain_Monte_Carlo) presented in the book: Tribble, Seth D., Markov chain Monte Carlo algorithms using completely uniformly distributed driving sequences, Diss. Stanford University, 2007.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.222.53.26