The missing value handling step is easy, since we already performed missing value exploration and summarized the required transformations in the previous section. The following steps are going to implement them.
First, we define a list of imputed values - for each column, we assign a single Double value:
val imputedValues = columnNames.map { _ match { case "hr" => 60.0 case _ => 0.0 } }
And a function which allow us to inject the values into our dataset:
import org.apache.spark.rdd.RDD def imputeNaN( data: RDD[Array[Double]], values: Array[Double]): RDD[Array[Double]] = { data.map { row => row.indices.map { i => if (row(i).isNaN) values(i) else row(i) }.toArray } }
The defined function accepts a Spark RDD where each row is represented as an array of Double numbers, and a parameter which contains values to replace the missing value for each column.
In the next step, we define a row filter - a method which removes all rows which contain more missing values than a given threshold. In this case, we can easily reuse the already computed value nanCountPerRow:
def filterBadRows( rdd: RDD[Array[Double]], nanCountPerRow: RDD[Int], nanThreshold: Int): RDD[Array[Double]] = { rdd.zip(nanCountPerRow).filter { case (row, nanCount) => nanCount > nanThreshold }.map { case (row, _) => row } }