Composing all transformations together

In the previous section, we developed individual steps using Spark primitives, that is, UDFs, native Spark algorithms, and H2O algorithms. However, to invoke all these transformation on unseen data requires a lot of manual effort. Hence, Spark introduces the concept of pipelines, mainly motivated by Python scikit pipelines (http://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html).

To learn more about the design decision behind Python, we recommend that you read the excellent paper "API design for machine learning software: experiences from the scikit-learn project" by Lars Buitinck et al (https://arxiv.org/abs/1309.0238).

The pipeline is composed of stages that are represented by estimators and transformations:

  • Estimators: These are the core elements that expose a fit method that creates a model. Most of the classification and regression algorithms are represented as an estimator.
  • Transformers: These transform an input dataset into a new dataset. The transformers expose the method transform, which implements the logic of transformation. The transformers can produce single on multiple vectors. Most of the models produced by estimators are transformers-they transform an input dataset into a new dataset representing the prediction. Another example can be the TF transformer used in this section.

The pipeline itself exposes the same interface as the estimator. It has the fit method, so it can be trained and produces a "pipeline model", which can be used for data transformation (it has the same interface as transformers). Hence, the pipelines can be combined hierarchically together. Furthermore, the individual pipeline stages are invoked in a sequential order; however, they can still represent a directed acyclic graph (for example, a stage can have two input columns, each produced by a different stage). In this case, the sequential order has to follow the topological ordering of the graph.

In our example, we will compose all the transformation together. However, we will not define a training pipeline (that is, a pipeline that will train all the models), but we will use the already trained models to set up the pipeline stages. Our motivation is to define a pipeline that we can use to score a new movie review.

So, let's start from the beginning of our example-the first operation that we applied on the input data was a simple tokenizer. It was defined by a Scala function that we wrapped into a form of Spark UDF. However, to use it as a part of the pipeline we need to wrap the defined Scala function into a transformation. Spark does not provide any simple wrapper to do that, so it is necessary to define a generic transformation from scratch. We know that we will transform a single column into a new column. In this case, we can use UnaryTransformer, which exactly defines one-to-one column transformation. We can be a little bit more generic and define a generic wrapper for Scala functions (aka Spark UDFs):

import org.apache.spark.ml.{Pipeline, UnaryTransformer} 
import org.apache.spark.sql.types._ 
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{MLWritable, MLWriter} class UDFTransformer[T, U](override valuid: String, f: T =>U, inType: DataType,
outType: DataType) extendsUnaryTransformer[T, U, UDFTransformer[T, U]] with MLWritable { override protected defcreateTransformFunc: T =>U = f override protected defvalidateInputType(inputType: DataType): Unit = require(inputType == inType) override protected defoutputDataType: DataType = outType override defwrite: MLWriter = new MLWriter { override protected defsaveImpl(path: String): Unit = {} } }

The UDFTransformer class wraps a function f, which accepts a generic type T, and produces type U. At the Spark dataset level, it transforms an input column (see UnaryTransformer) of type inType into a new output column (again, the field is defined by UnaryTransformer) of the outType type. The class also has a dummy implementation of the trait MLWritable, which supports serialization of the transformer into a file.

Now, we just need to define our tokenizer transformer:

val tokenizerTransformer= new UDFTransformer[String, Array[String]](
"tokenizer", toTokens.curried(MIN_TOKEN_LENGTH)(stopWords),
StringType, new ArrayType(StringType, true))

The defined transformer accepts a string column (that is, a movie review) and produces a new column that contains an array of strings representing movie review tokens. The transformer is directly using the toTokens function, which we used at the beginning of the chapter.

The next transformation should remove rare words. In this case, we will use a similar approach as in the previous step and utilize the defined UDFTransformer function:

val rareTokensFilterTransformer= new UDFTransformer[Seq[String], Seq[String]](
"rareWordsRemover",
rareTokensFilter.curried(rareTokens),
newArrayType(StringType, true), new ArrayType(StringType, true))

This transformer accepts a column containing an array of tokens and produces a new column containing a filtered array of tokens. It is using the already defined rareTokensFilter Scala function.

So far, we have not specified any input data dependencies, including names of input columns. We will keep it for the final pipeline definition.

The next steps include vectorization with the help of the TF method hashing string tokens into a large numeric space and followed by a transformation based on the built IDF model. Both transformations are already defined in the expected form-the first hashingTF transformation is already a transformer translating a set of tokens into numeric vectors, the second one idfModel accepts the numeric vector and scales it based on the computed coefficients.

These steps provide input for the trained binomial models. Each base model represents a transformer producing several new columns such as prediction, raw prediction, and probabilities. However, it is important to mention that not all models provide the full set of columns. For example, Spark GBM currently (Spark version 2.0.0) provides only the prediction column. Nevertheless, it is good enough for our example.

After generating predictions, our dataset contains many columns; for example, input columns, columns with tokens, transformed tokens, and so on. However, to apply the generated meta-learner, we need only columns with prediction generated by the base models. Hence, we will define a column selector transformation, which drops all the unnecessary columns. In this case, we have a transformation-accepting dataset with N-columns and producing a new dataset with M-columns. Therefore, we cannot use UnaryTransformer defined earlier, and we need to define a new ad-hoc transformation called ColumnSelector:

import org.apache.spark.ml.Transformer 
class ColumnSelector(override valuid: String, valcolumnsToSelect: Array[String]) extends Transformer with MLWritable { 
 
  override deftransform(dataset: Dataset[_]): DataFrame= { 
    dataset.select(columnsToSelect.map(dataset.col): _*) 
  } 
 
  override deftransformSchema(schema: StructType): StructType = { 
    StructType(schema.fields.filter(col=>columnsToSelect
.contains(col.name))) } override defcopy(extra: ParamMap): ColumnSelector = defaultCopy(extra) override defwrite: MLWriter = new MLWriter { override protected defsaveImpl(path: String): Unit = {} } }

ColumnSelector represents a generic transformer that selects only the given columns from the input dataset. It is important to mention the overall two-stages concept-the first stage transforms the schema (that is, the metadata associated with each dataset) and the second transforms the actual dataset. The separation allows Spark to invoke early checks on transformers to find incompatibilities before invoking actual data transformations.

We need to define the actual column selector transformer by creating an instance of columnSelector-be aware of specifying the right columns to keep:

val columnSelector= new ColumnSelector( 
  "columnSelector",  Array(s"DT_${dtModel.getPredictionCol}", 
  s"NB_${nbModel.getPredictionCol}", 
  s"RF_${rfModel.getPredictionCol}", 
  s"GBM_${gbmModel.getPredictionCol}") 

At this point, our transformers are ready to be composed into the final "super-learning" pipeline. The API of the pipeline is straightforward-it accepts individual stages that are invoked sequentially. However, we still need to specify dependencies between individual stages. Mostly the dependency is described by input and output column names:

val superLearnerPipeline = new Pipeline() 
 .setStages(Array( 
// Tokenize 
tokenizerTransformer 
     .setInputCol("reviewText") 
     .setOutputCol("allReviewTokens"), 
// Remove rare items 
rareTokensFilterTransformer 
     .setInputCol("allReviewTokens") 
     .setOutputCol("reviewTokens"), 
hashingTF, 
idfModel, 
dtModel 
     .setPredictionCol(s"DT_${dtModel.getPredictionCol}") 
     .setRawPredictionCol(s"DT_${dtModel.getRawPredictionCol}") 
     .setProbabilityCol(s"DT_${dtModel.getProbabilityCol}"), 
nbModel 
     .setPredictionCol(s"NB_${nbModel.getPredictionCol}") 
     .setRawPredictionCol(s"NB_${nbModel.getRawPredictionCol}") 
     .setProbabilityCol(s"NB_${nbModel.getProbabilityCol}"), 
rfModel 
     .setPredictionCol(s"RF_${rfModel.getPredictionCol}") 
     .setRawPredictionCol(s"RF_${rfModel.getRawPredictionCol}") 
     .setProbabilityCol(s"RF_${rfModel.getProbabilityCol}"), 
gbmModel// Note: GBM does not have full API of PredictionModel 
.setPredictionCol(s"GBM_${gbmModel.getPredictionCol}"), 
columnSelector, 
metaLearningModel 
 )) 

There are a few important concepts worth mentioning:

  • The tokenizerTransformer and rareTokensFilterTransformer are connected via the column allReviewTokens-the first one is the column producer, and the second one is the column consumer.
  • The dtModel, nbModel, rfModel, and gbmModel models have the same input column defined as idf.getOutputColumn. In this case, we have effectively used computation DAG, which is topologically ordered into a sequence
  • All the models have the same output columns (with some exceptions, in the case of GBM), which cannot be appended into the resulting dataset all together since the pipeline expects unique names of columns. Hence, we need to rename the output columns of the models by calling setPredictionCol, setRawPredictionCol, and setProbabilityCol. It is important to mention that the GBM does not produce raw prediction and probabilities columns right now.

Now, we can fit the pipeline to get the pipeline model. This is, in fact, an empty operation, since our pipeline is composed only of transformers. However, we still need to call the fit method:

val superLearnerModel= superLearnerPipeline.fit(pos)

Voila, we have our super-learner model, composed of multiple Spark models and orchestrated by the H2O deep learning model. It is time to use the model to make a prediction!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.47.130