One-hot encoding categorical features

Similar to the previous chapter, we need to encode categorical features into sets of multiple binary features by executing the following steps:

  1. In our case, the categorical features include the following:
>>> categorical = df_train.columns
>>> categorical.remove('label')
>>> print(categorical)
['C1', 'banner_pos', 'site_id', 'site_domain', 'site_category', 'app_id', 'app_domain', 'app_category', 'device_model', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21']

In PySpark, one-hot encoding is not as direct as scikit-learn (specifically, with the OneHotEncoder module). 

  1. We first need to index each categorical column using the StringIndexer module:
>>> from pyspark.ml.feature import StringIndexer
>>> indexers = [
... StringIndexer(inputCol=c, outputCol=
"{0}_indexed".format(c)).setHandleInvalid("keep")

... for c in categorical
... ]

The setHandleInvalid("keep") handle makes sure it won't crash if any new categorical value occurs. Try to omit it and you will see error messages related to unknown values.

  1. Then, we perform one-hot encoding on each individual indexed categorical column using the OneHotEncoderEstimator module:
>>> from pyspark.ml.feature import OneHotEncoderEstimator
>>> encoder = OneHotEncoderEstimator(
... inputCols=[indexer.getOutputCol() for indexer in indexers],
... outputCols=["{0}_encoded".format(indexer.getOutputCol())
for indexer in indexers]
... )
  1. Next, we concatenate all sets of generated binary vectors into a single one using the VectorAssembler module:
>>> from pyspark.ml.feature import VectorAssembler
>>> assembler = VectorAssembler(
... inputCols=encoder.getOutputCols(),
... outputCol="features"
... )

This creates the final encoded vector column called features.

  1. We chain all these three stages together into a pipeline with the Pipeline module in PySpark, which better organizes our one-hot encoding workflow:
>>> stages = indexers + [encoder, assembler]
>>> from pyspark.ml import Pipeline
>>> pipeline = Pipeline(stages=stages)
  1. Finally, we can fit the pipeline one-hot encoding model over the training set:
>>> one_hot_encoder = pipeline.fit(df_train)
  1. Once this is done, we use the trained encoder to transform both the training and testing sets. For the training set, we use the following code:
>>> df_train_encoded = one_hot_encoder.transform(df_train)
>>> df_train_encoded.show()

At this point, we skip displaying the results as there are dozens of columns with several additional ones added on top of df_train.

  1. However, we can see the one we are looking for, the features column, which contains the one-hot encoded results. Hence, we only select this column along with the target variable:
>>> df_train_encoded = df_train_encoded.select(
["label", "features"])

>>> df_train_encoded.show()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 0|(31458,[5,7,3527,...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1532,...|
| 0|(31458,[5,7,4366,...|
| 0|(31458,[5,7,14,45...|
+-----+--------------------+
only showing top 20 rows

The feature column contains sparse vectors of size 31,458.

  1. Don't forget to cache df_train_encoded, as we will be using it to iteratively train our classification model:
>>> df_train_encoded.cache()
DataFrame[label: int, features: vector]
  1. To release some space, we uncache df_train, since we will no longer need it:
>>> df_train.unpersist()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
  1. Now, we will repeat the preceding steps for the testing set:
>>> df_test_encoded = one_hot_encoder.transform(df_test)
>>> df_test_encoded = df_test_encoded.select(["label", "features"])
>>> df_test_encoded.show()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,2859,...|
| 0|(31458,[1,7,651,4...|
+-----+--------------------+
only showing top 20 rows
>>> df_test_encoded.cache()
DataFrame[label: int, features: vector]
>>> df_test.unpersist()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
  1. If you check the Spark UI localhost:4040/jobs/ in your browser, you will see several completed jobs, such as the following:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.187.233