In the previous sections, we explored different data processing steps, and built and evaluated several models to predict the loan status and interest rates for the accepted loans. Now, it is time to use all built artifacts and compose them together to score new loans.
There are multiple steps that we need to consider:
- Data cleanup
- The emp_title column preparation pipeline
- The desc column transformation into a vector representing significant words
- The binomial model to predict loan acceptance status
- The regression model to predict loan interest rate
To reuse these steps, we need to connect them into a single function that accepts input data and produces predictions involving loan acceptance status and interest rate.
The scoring functions is easy-it replays all the steps that we did in the previous chapters:
import _root_.hex.tree.drf.DRFModel
def scoreLoan(df: DataFrame,
empTitleTransformer: PipelineModel,
loanStatusModel: DRFModel,
goodLoanProbThreshold: Double,
intRateModel: GBMModel)(h2oContext: H2OContext): DataFrame = {
val inputDf = empTitleTransformer.transform(basicDataCleanup(df))
.withColumn("desc_denominating_words", descWordEncoderUdf(col("desc")))
.drop("desc")
val inputHf = toHf(inputDf, "input_df_" + df.hashCode())(h2oContext)
// Predict loan status and int rate
val loanStatusPrediction = loanStatusModel.score(inputHf)
val intRatePrediction = intRateModel.score(inputHf)
val probGoodLoanColName = "good loan"
val inputAndPredictionsHf = loanStatusPrediction.add(intRatePrediction).add(inputHf)
inputAndPredictionsHf.update()
// Prepare field loan_status based on threshold
val loanStatus = (threshold: Double) => (predGoodLoanProb: Double) =>if (predGoodLoanProb < threshold) "bad loan" else "good loan"
val loanStatusUdf = udf(loanStatus(goodLoanProbThreshold))
h2oContext.asDataFrame(inputAndPredictionsHf)(df.sqlContext).withColumn("loan_status", loanStatusUdf(col(probGoodLoanColName)))
}
We use all definitions that we prepared before-basicDataCleanup method, empTitleTransformer, loanStatusModel, intRateModel-and apply them in the corresponding order.
The method uses all the generated artifacts. For example, we can score the input data in the following way:
val prediction = scoreLoan(loanStatusDfSplits(0),
empTitleTransformer,
loanStatusBaseModel4,
minLossModel4._4,
intRateModel)(h2oContext)
prediction.show(10)
The output is as follows:
However, to score new loans independently from our training code, we still need to export trained models and pipelines in some reusable form. For Spark models and pipelines, we can directly use Spark serialization. For example, the defined empTitleTransormer can be exported in this way:
val MODELS_DIR = s"${sys.env.get("MODELSDIR").getOrElse("models")}"
val destDir = new File(MODELS_DIR)
empTitleTransformer.write.overwrite.save(new File(destDir, "empTitleTransformer").getAbsolutePath)
For H2O models, the situation is more complicated since there are several ways of model export: binary, POJO, and MOJO. The binary export is similar to the Spark export; however, to reuse the exported binary model, it is necessary to have a running instance of the H2O cluster. This limitation is removed by the other methods. The POJO exports the model as Java code, which can be compiled and run independently from the H2O cluster. Finally, the MOJO export model is in a binary form, which can be interpreted and used without running the H2O cluster. In this chapter, we will use the MOJO export, since it is straightforward and also the recommended method for model reuse:
loanStatusBaseModel4.getMojo.writeTo(new FileOutputStream(new File(destDir, "loanStatusModel.mojo")))
intRateModel.getMojo.writeTo(new FileOutputStream(new File(destDir, "intRateModel.mojo")))
We can also export the Spark schema that defines the input data. This will be useful for the definition of a parser of the new data:
def saveSchema(schema: StructType, destFile: File, saveWithMetadata: Boolean = false) = {
import java.nio.file.{Files, Paths, StandardOpenOption}
import org.apache.spark.sql.types._
val processedSchema = StructType(schema.map {
case StructField(name, dtype, nullable, metadata) =>StructField(name, dtype, nullable, if (saveWithMetadata) metadata else Metadata.empty)
case rec => rec
})
Files.write(Paths.get(destFile.toURI),
processedSchema.json.getBytes(java.nio.charset.StandardCharsets.UTF_8),
StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE)
}
saveSchema(loanDataDf.schema, new File(destDir, "inputSchema.json"))