Here, we can directly reuse the list of generic English stopwords provided by Spark. However, we can enrich it by our specific stopwords:
import org.apache.spark.ml.feature.StopWordsRemover val stopWords= StopWordsRemover.loadDefaultStopWords("english") ++ Array("ax", "arent", "re")
As stated earlier, this is an extremely delicate task and highly dependent on the business problem you are looking to solve. You may wish to add to this list terms that are relevant to your domain that will not help the prediction task.
Declare a tokenizer that tokenizes reviews and omits all stopwords and words that are too short:
val MIN_TOKEN_LENGTH = 3
val toTokens= (minTokenLen: Int, stopWords: Array[String],
review: String) =>
review.split("""W+""")
.map(_.toLowerCase.replaceAll("[^\p{IsAlphabetic}]", ""))
.filter(w =>w.length>minTokenLen)
.filter(w => !stopWords.contains(w))
Let's take a look at this function step by step to see what it's doing. It accepts a single review as an input and then calls the following functions:
- .split("""W+"""): This splits movie review text into tokens that are represented by alphanumeric characters only.
- .map(_.toLowerCase.replaceAll("[^\p{IsAlphabetic}]", "")): As a best practice, we lowercase the tokens so that at index time, Java = JAVA = java. However, this unification is not always the case, and it's important that you are aware of the implications lowercasing your text data can have on the model. As an example, "Python," the computing language would lowercase to "python," which is also a snake. Clearly, the two tokens are not the same; however, lowercasing would make it so! We will also filter out all numeric characters.
- .filter(w =>w.length>minTokenLen): Only keep those tokens whose length is greater than a specified limit (in our case, three characters).
- .filter(w => !stopWords.contains(w)): Using the stopwords list that we declared beforehand, we can remove these terms from our tokenized data.
We can now directly apply the defined function on the corpus of reviews:
import spark.implicits._ val toTokensUDF= udf(toTokens.curried(MIN_TOKEN_LENGTH)(stopWords)) movieReviews= movieReviews.withColumn("reviewTokens",
toTokensUDF('reviewText))
In this case, we are marking the function toTokens as a Spark user-defined function by calling the udf marker, which exposes a common Scala function to be used in the context of the Spark DataFrame. After that, we can directly apply the defined udf function on the reviewText column in the loaded dataset. The output from the function creates a new column called reviewTokens.
Furthermore, you can reuse the defined toTokens method among different projects, which do not necessarily need to be Spark-based.
The following code finds all the rare tokens:
val RARE_TOKEN = 2
val rareTokens= movieReviews.select("reviewTokens")
.flatMap(r =>r.getAs[Seq[String]]("reviewTokens"))
.map((v:String) => (v, 1))
.groupByKey(t => t._1)
.reduceGroups((a,b) => (a._1, a._2 + b._2))
.map(_._2)
.filter(t => t._2 <RARE_TOKEN)
.map(_._1)
.collect()
Rare tokens computation is a complex operation. In our example, the input is represented by rows containing a list of tokens. However, we need to compute all the unique tokens and their occurrences.
Therefore, we flatten the structure into a new dataset where each row represents a token by using the flatMap method.
Then, we can use the same strategy that we used in the previous chapters. We can generate key-value pairs (word, 1) for each word.
The pair is expressing the number of occurrences of the given word. Then, we will just group all the pairs with the same word together (the groupByKey method) and compute the total number of occurrences of the word representing a group (reduceGroups). The following steps just filter out all too frequent words and finally collect the result as a list of words.
The next goal is to find rare tokens. In our example, we will consider each token with occurrences less than three as rare:
println(s"Rare tokens count: ${rareTokens.size}")
println(s"Rare tokens: ${rareTokens.take(10).mkString(", ")}")
The output is as following:
Now that we have our tokenization function, it is time to filter out rare tokens by defining another Spark UDF, which we will directly apply on the reviewTokens input data column:
val rareTokensFilter= (rareTokens: Array[String], tokens: Seq[String]) =>tokens.filter(token => !rareTokens.contains(token)) val rareTokensFilterUDF= udf(rareTokensFilter.curried(rareTokens)) movieReviews= movieReviews.withColumn("reviewTokens", rareTokensFilterUDF('reviewTokens)) println("Movie reviews tokens:") movieReviews.show(5)
Movie reviews tokens are as follows:
Depending on your particular task, you may wish to add or perhaps delete some stopwords or explore different regular expression patterns (teasing out email addresses using regular expressions, for example, is quite common). For now, we will take the tokens that we have to and use it to build our dataset.