Missing values

The missing value handling step is easy, since we already performed missing value exploration and summarized the required transformations in the previous section. The following steps are going to implement them.

First, we define a list of imputed values - for each column, we assign a single Double value:

val imputedValues = columnNames.map { 
  _ match { 
    case "hr" => 60.0 
    case _ => 0.0 
  } 
} 

And a function which allow us to inject the values into our dataset:

import org.apache.spark.rdd.RDD 
def imputeNaN( 
  data: RDD[Array[Double]],  
  values: Array[Double]): RDD[Array[Double]] = { 
    data.map { row => 
      row.indices.map { i => 
        if (row(i).isNaN) values(i) 
        else row(i) 
      }.toArray 
    } 
} 

The defined function accepts a Spark RDD where each row is represented as an array of Double numbers, and a parameter which contains values to replace the missing value for each column.

In the next step, we define a row filter - a method which removes all rows which contain more missing values than a given threshold. In this case, we can easily reuse the already computed value nanCountPerRow:

def filterBadRows( 
  rdd: RDD[Array[Double]], 
  nanCountPerRow: RDD[Int], 
  nanThreshold: Int): RDD[Array[Double]] = { 
    rdd.zip(nanCountPerRow).filter { case (row, nanCount) => 
      nanCount > nanThreshold 
  }.map { case (row, _) => 
        row 
  } 
} 
Please notice that we parameterize defined transformations. It is good practice to keep code flexible enough to permit further experimentation with parameters. On the other hand, it is good to avoid building a complex framework. The rule of thumb is to parameterize functionality which we would like to use in different contexts or we need to have a freedom in configuring code constants.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.216.245.99