In this example, what we're going to do is implement linear regression, and linear regression is just a way of fitting a line to a set of data. What we're going to do in this exercise is take a bunch of fabricated data that we have in two dimensions, and try to fit a line to it with a linear model.
We're going to separate our data into two sets, one for building the model and one for evaluating the model, and we'll compare how well this linear model does at actually predicting real values. First of all, in Spark 2, if you're going to be doing stuff with the SparkSQL interface and using Datasets, you've got to be using a SparkSession object instead of a SparkContext. To set one up, you do the following:
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("LinearRegression").getOrCreate()
Okay, so you can say spark, give it an appName and getOrCreate().
This is interesting, because once you've created a Spark session, if it terminates unexpectedly, you can actually recover from that the next time that you run it. So, if we have a checkpoint directory, it can actually restart where it left off using getOrCreate.
Now, we're going to use this regression.txt file that I have included with the course materials:
inputLines = spark.sparkContext.textFile("regression.txt")
That is just a text file that has comma-delimited values of two columns, and they're just two columns of, more or less randomly, linearly correlated data. It can represent whatever you want. Let's imagine that it represents heights and weights, for example. So, the first column might represent heights, the second column might represent weights.
In this example, maybe heights are the labels and the features are the weights. Maybe we're trying to predict heights based on your weight. It can be anything, it doesn't matter. This is all normalized down to data between -1 and 1. There's no real meaning to the scale of the data anywhere, you can pretend it means anything you want, really.
To use this with MLlib, we need to transform our data into the format it expects:
data = inputLines.map(lambda x: x.split(",")).map(lambda x: (float(x[0]), Vectors.dense(float(x[1]))))
The first thing we're going to do is split that data up with this map function that just splits each line into two distinct values in a list, and then we're going to map that to the format that MLlib expects. That's going to be a floating point label, and then a dense vector of the feature data.
In this case, we only have one bit of feature data, the weight, so we have a vector that just has one thing in it, but even if it's just one thing, the MLlib linear regression model requires a dense vector there. This is like a labeledPoint in the older API, but we have to do it the hard way here.
Next, we need to actually assign names to those columns. Here's the syntax for doing that:
colNames = ["label", "features"] df = data.toDF(colNames)
We're going to tell MLlib that these two columns in the resulting RDD actually correspond to the label and the features, and then I can convert that RDD to a DataFrame object. At this point, I have an actual dataframe or, if you will, a Dataset that contains two columns, label and features, where the label is a floating point height, and the features column is a dense vector of floating point weights. That is the format required by MLlib, and MLlib can be pretty picky about this stuff, so it's important that you pay attention to these formats.
Now, like I said, we're going to split our data in half.
trainTest = df.randomSplit([0.5, 0.5]) trainingDF = trainTest[0] testDF = trainTest[1]
We're going to do a 50/50 split between training data and test data. This returns back two dataframes, one that I'm going to use to actually create my model, and one that I'm going to use to evaluate my model.
I will next create my actual linear regression model with a few standard parameters here that I've set.
lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
We're going to call lir = LinearRegression, and then I will fit that model to the set of data that I held aside for training, the training data frame:
model = lir.fit(trainingDF)
That gives me back a model that I can use to make predictions from.
Let's go ahead and do that.
fullPredictions = model.transform(testDF).cache()
I will call model.transform(testDF), and what that's going to do is predict the heights based on the weights in my testing Dataset. I actually have the known labels, the actual, correct heights, and this is going to add a new column to that dataframe called predictions, that has the predicted values based on that linear model.
I'm going to cache those results, and now I can just extract them and compare them together. So, let's pull out the prediction column, just using select like you would in SQL, and then I'm going to actually transform that dataframe and pull out the RDD from it, and use that to map it to just a plain old RDD full of floating point heights in this case:
predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
These are the predicted heights. Next, we're going to get the actual heights from the label column:
labels = fullPredictions.select("label").rdd.map(lambda x: x[0])
Finally, we can zip them back together and just print them out side by side and see how well it does:
predictionAndLabel = predictions.zip(labels).collect() for prediction in predictionAndLabel: print(prediction) spark.stop()
This is kind of a convoluted way of doing it; I did this to be more consistent with the previous example, but a simpler approach would be to just actually select prediction and label together into a single RDD that maps out those two columns together and then I don't have to zip them up, but either way it works. You'll also note that right at the end there we need to stop the Spark session.
So let's see if it works. Let's go up to Tools, Canopy Command Prompt, and we'll type in spark-submit SparkLinearRegression.py and let's see what happens.
There's a little bit more upfront time to actually run these APIs with Datasets, but once they get going, they're very fast. Alright, there you have it.
Here we have our actual and predicted values side by side, and you can see that they're not too bad. They tend to be more or less in the same ballpark. There you have it, a linear regression model in action using Spark 2.0, using the new dataframe-based API for MLlib. More and more, you'll be using these APIs going forward with MLlib in Spark, so make sure you opt for these when you can. Alright, that's MLlib in Spark, a way of actually distributing massive computing tasks across an entire cluster for doing machine learning on big Datasets. So, good skill to have. Let's move on.