Before running any data manipulation, we need to prepare the Spark environment as we did in the previous chapters. Let's start the Spark shell and request enough memory to process the downloaded dataset:
export SPARK_HOME="<path to your Spark2.0 distribution"
export SPARKLING_WATER_VERSION="2.1.12"
export SPARK_PACKAGES=
"ai.h2o:sparkling-water-core_2.11:${SPARKLING_WATER_VERSION},
ai.h2o:sparkling-water-repl_2.11:${SPARKLING_WATER_VERSION},
ai.h2o:sparkling-water-ml_2.11:${SPARKLING_WATER_VERSION},
com.packtpub:mastering-ml-w-spark-utils:1.0.0"
$SPARK_HOME/bin/spark-shell
--master 'local[*]'
--driver-memory 10g
--executor-memory 10g
--confspark.executor.extraJavaOptions=-XX:MaxPermSize=384M
--confspark.driver.extraJavaOptions=-XX:MaxPermSize=384M
--packages "$SPARK_PACKAGES" "$@"
sc.setLogLevel("WARN")
The command decreases the verbosity of the Spark output.
The next challenge is to read in the training dataset, which is composed of 25,000 positive and negative movie reviews. The following lines of code will read in these files and then create our binary labels of 0 for a negative review and 1 for a positive review.
We directly utilize the exposed Spark sqlContext method, textFile, that allows for reading multiple files and returns Dataset[String]. This is the difference from the method mentioned in the previous chapters, which were using the method called wholeTextFiles and producing RDD[String]:
val positiveReviews= spark.sqlContext.read.textFile("../data/aclImdb/train/pos/*.txt") .toDF("reviewText") println(s"Number of positive reviews: ${positiveReviews.count}")
Number of positive reviews: 12500
We can directly show the first five lines using the dataset method show (you can modify the truncate parameter to show the full text of the review):
println("Positive reviews:")
positiveReviews.show(5, truncate = true)
Next, we will do the same thing for the negative reviews:
val negativeReviews= spark.sqlContext.read.textFile("../data/aclImdb/train/neg/*.txt")
.toDF("reviewText")
println(s"Number of negative reviews: ${negativeReviews.count}")
Take a look at the following screenshot:
Now, each of the positiveReview and negativeReviews variables represents RDD of loaded reviews. Each row of dataset contains a string representing a single review. However, we still need to generate corresponding labels and merge both loaded datasets together.
The labeling is easy, since we loaded negative and positive reviews as separated Spark DataFrames. We can directly append a constant column representing the label 0 for negative reviews and 1 for positive reviews:
import org.apache.spark.sql.functions._
val pos= positiveReviews.withColumn("label", lit(1.0))
val neg= negativeReviews.withColumn("label", lit(0.0))
var movieReviews= pos.union(neg).withColumn("row_id", monotonically_increasing_id)
println("All reviews:")
movieReviews.show(5)
Take a look at the following screenshot:
In this case, we used the withColumn method, which appends a new column to an existing dataset. The definition of a new column lit(1.0) means a constant column defined by a numeric literal 1.0. We need to use a real number to define the target value, since the Spark API expects it. Finally, we merged both datasets together using the union method.
We also appended the magic column row_id, which uniquely identifies each row in the dataset. This trick simplifies our workflow later when we need to join the output of several algorithms.