The emp_title column transformation

The first column, emp_title, describes the employment title. However, it is not unified-there are multiple versions with the same meaning ("Bank of America" versus "bank of america") or a similar meaning ("AT&T" and "AT&T Mobility"). Our goal is to unify the labels into a basic form, detect similar labels, and replace them by a common title. The theory is the employment title has a direct impact on the ability to pay back the loan.

The basic unification of labels is a simple task-transform labels into lowercase form and drop all non-alphanumeric characters ("&" or "."). For this step, we will use the Spark API for user-defined functions:

val unifyTextColumn = (in: String) => {
if (in != null) in.toLowerCase.replaceAll("[^\w ]|", "") else null
}
val unifyTextColumnUdf = udf(unifyTextColumn)

The next step defines a tokenizer, a function that splits a sentence into individual tokens and drops useless and stop words (for example, too short words or conjunctions). In our case, we will make the minimal token length and list of stop words flexible as input parameters:

val ALL_NUM_REGEXP = java.util.regex.Pattern.compile("\d*")
val tokenizeTextColumn = (minLen: Int) => (stopWords: Array[String]) => (w: String) => {
if (w != null)
w.split(" ").map(_.trim).filter(_.length >= minLen).filter(!ALL_NUM_REGEXP.matcher(_).matches()).filter(!stopWords.contains(_)).toSeq
else
Seq.empty[String]
}
import org.apache.spark.ml.feature.StopWordsRemover
val tokenizeUdf = udf(tokenizeTextColumn(3)(StopWordsRemover.loadDefaultStopWords("english")))
It is important to mention that Spark API provides a list of stop words already as part of StopWordsRemover transformation. Our definition of tokenizeUdf directly utilizes the provided list of English stop words.

Now, it is time to look at the column in more detail. We will start by selecting the emp_title column from the already created DataFrame, loanStatusBaseModelDf, and apply the two functions defined earlier:

val empTitleColumnDf = loanStatusBaseModelDf
.withColumn("emp_title", unifyTextColumnUdf($"emp_title"))
.withColumn("emp_title_tokens", tokenizeUdf($"emp_title"))

Now, we have a Spark DataFrame with two important columns: the first contains unified emp_title and the second one is represented by a list of tokens. With the help of Spark SQL API, we can easily compute the number of unique values in the emp_title column or the number of unique tokens with a frequency of more than 100 (that is, it means the word was used in more than 100 emp_titles):

println("Number of unique values in emp_title column: " +
empTitleColumn.select("emp_title").groupBy("emp_title").count().count())
println("Number of unique tokens with freq > 100 in emp_title column: " +
empTitleColumn.rdd.flatMap(row => row.getSeq[String](1).map(w => (w, 1)))
.reduceByKey(_ + _).filter(_._2 >100).count)

The output is as follows:

You can see that there are many unique values in the emp_title column. On the other hand, there are only 717 tokens that are repeated over and over. Our goal to compress the number of unique values in the column and group similar values together. We can experiment with different methods. For example, encode each emp_title with a representative token or use a more advanced technique based on the Word2Vec algorithm.

In the preceding code, we combined DataFrame query capabilities with the computation power of raw RDDs. Many queries can be expressed with powerful SQL-based DataFrame APIs; however, if we need to process structured data (such as the sequence of string tokens in the preceding example), often the RDD API is a quick way to go.

Let's look at the second option. The Word2Vec algorithm transforms text features into a vector space where similar words are closed together with respect to cosine distance of corresponding vectors representing the words. That's a nice property; however, we still need to detect "groups of similar words". For this task, we can simply use the KMeans algorithm.

The first step is to create the Word2Vec model. Since we have data in a Spark DataFrame, we will simply use the Spark implementation from the ml package:

import org.apache.spark.ml.feature.Word2Vec
val empTitleW2VModel = new Word2Vec()
.setInputCol("emp_title_tokens")
.setOutputCol("emp_title_w2vVector")
.setMinCount(1)
.fit(empTitleColumn)

The algorithm input is defined by a sequence of tokens representing sentences stored in the "tokens" column. The outputCol parameter defines the output of the model if it is used to transform the data:


val empTitleColumnWithW2V = w2vModel.transform(empTitleW2VModel)
empTitleColumnWithW2V.printSchema()

The output is as follows:

From the output of transformation, you can directly see that the DataFrame output contains not only the emp_title and emp_title_tokens input columns, but also the emp_title_w2vVector column, which represents the output of the w2vModel transformation.

It is important to mention that the Word2Vec algorithm is defined only for words, but the Spark implementation transforms sentences (that is, the sequence of words) into a vector as well by averaging all the word vectors that the sentence represents.

In the next step, we will build a K-means model to partition a vector space representing individual employment titles into a predefined number of clusters. Before doing this, it's important to think about why this would be a good thing to do in the first place. Think about the many different variations of saying "Software Engineer" that you know of: Programmer Analyst, SE, Senor Software Engineer, and so on. Given these variations that all essentially mean the same thing and will be represented by similar vectors, clustering provides us with a means to group similar titles together. However, we need to specify how many K clusters we should detect-this needs more experimentation, but for simplicity, we will try 500 clusters:

import org.apache.spark.ml.clustering.KMeans
val K = 500
val empTitleKmeansModel = new KMeans()
.setFeaturesCol("emp_title_w2vVector")
.setK(K)
.setPredictionCol("emp_title_cluster")
.fit(empTitleColumnWithW2V)

The model allows us to transform the input data and explore the clusters. The cluster number is stored in a new column called emp_title_cluster.

Specifying the number of clusters is tricky given that we are dealing with the unsupervised world of machine learning. Often, practitioners will use a simple heuristic known as the elbow method( refer the following link: https://en.wikipedia.org/wiki/Determining_the_number_of_clusters_in_a_data_set), which basically runs through many K-means models, increasing the number of K-clusters as a function of the heterogeneity (uniqueness) among each of the clusters. Usually, there is a diminishing gain as we increase the number of K-clusters and the trick is to find where the increase becomes marginal to the point where the benefit is no longer "worth" the run time.
Alternatively, there are some information criteria statistics known as AIC (Akaike Information Criteria) (https://en.wikipedia.org/wiki/Akaike_information_criterion) and BIC (Bayesian Information Criteria) (https://en.wikipedia.org/wiki/Bayesian_information_criterion) that those of you who are interested should look into for further insight. Note that at of the time of writing this book, Spark has yet to implement these information criteria, and hence, we will not cover this in more detail.
Take a look at the following code snippet:
val clustered = empTitleKmeansModel.transform(empTitleColumnWithW2V)
clustered.printSchema()

The output is as follows:

Additionally, we can explore words associated with a random cluster:

println(
s"""Words in cluster '133':
|${clustered.select("emp_title").where("emp_title_cluster = 133").take(10).mkString(", ")}
|""".stripMargin)

The output is as follows:

Look at the preceding cluster and ask yourself, "Do these titles seem like a logical cluster?" Perhaps more training may be required, or perhaps we need to consider further feature transformations, such as running an n-grammer, which can identify sequences of words that occur with a high degree of frequency. Interested parties can check out the n-grammer section in Spark here.

Furthermore, the emp_title_cluster column defines a new feature that we will use to replace the original emp_title column. We also need to remember all the steps and models we used in the process of the column preparation, since we will need to reproduce them to enrich the new data. For this purpose, the Spark pipeline is defined:

import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.types._

val empTitleTransformationPipeline = new Pipeline()
.setStages(Array(
new UDFTransformer("unifier", unifyTextColumn, StringType, StringType)
.setInputCol("emp_title").setOutputCol("emp_title_unified"),
new UDFTransformer("tokenizer",
tokenizeTextColumn(3)(StopWordsRemover.loadDefaultStopWords("english")),
StringType, ArrayType(StringType, true))
.setInputCol("emp_title_unified").setOutputCol("emp_title_tokens"),
empTitleW2VModel,
empTitleKmeansModel,
new ColRemover().setKeep(false).setColumns(Array("emp_title", "emp_title_unified", "emp_title_tokens", "emp_title_w2vVector"))
))

The first two pipeline steps represent the application of user-defined functions. We used the same trick that was used in Chapter 4Predicting Movie Reviews Using NLP and Spark Streaming, to wrap an UDF into a Spark pipeline transformer with help of the defined UDFTransformer class. The remaining steps represent models that we built.

The defined UDFTransformer class is a nice way to wrap UDF into Spark pipeline transformer, but for Spark, it is a black box and it cannot perform all the powerful transformations. However, it could be replaced by an existing concept of the Spark SQLTransformer, which can be understood by the Spark optimizer; on the other hand, its usage is not so straightforward.

The pipeline still needs to be fit; however, in our case, since we used only Spark transformers, the fit operation bundles all the defined stages into the pipeline model:

val empTitleTransformer = empTitleTransformationPipeline.fit(loanStatusBaseModelDf)

Now, it is time to evaluate the impact of the new feature on the model quality. We will repeat the same steps we did earlier during the evaluation of the quality of the base model:

  • Prepare training and validation parts and enrich them with a new feature, emp_title_cluster.
  • Build a model.
  • Compute total the money loss and find the minimal loss.

For the first step, we will reuse the prepared train and validation parts; however, we need to transform them with the prepared pipeline and drop the "raw" column, desc:

val trainLSBaseModel3Df = empTitleTransformer.transform(loanStatusDfSplits(0))
val validLSBaseModel3Df = empTitleTransformer.transform(loanStatusDfSplits(1))
val trainLSBaseModel3Hf = toHf(trainLSBaseModel3Df.drop("desc"), "trainLSBaseModel3Hf")(h2oContext)
val validLSBaseModel3Hf = toHf(validLSBaseModel3Df.drop("desc"), "validLSBaseModel3Hf")(h2oContext)

When we have the data ready, we can repeat the model training with the same parameters we used for the base model training, except that we use the prepared input training part:

loanStatusBaseModelParams._train = trainLSBaseModel3Hf._key
val loanStatusBaseModel3 = new DRF(loanStatusBaseModelParams, water.Key.make[DRFModel]("loanStatusBaseModel3"))
.trainModel()
.get()

Finally, we can evaluate the model on the validation data and compute our evaluation metrics based on the total money loss:

val minLossModel3 = findMinLoss(loanStatusBaseModel3, validLSBaseModel3Hf, DEFAULT_THRESHOLDS)
println(f"Min total loss for model 3: ${minLossModel3._2}%,.2f (threshold = ${minLossModel3._1})")

The output is shown here:

We can see that employing an NLP technique to detect a similar job title slightly improves the quality of the model, resulting in decreasing the total dollar loss computed on the unseen data. However, the question is whether we can improve our model even more based on the desc column, which could include useful information.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.118.210