The first step involves data load. In the case of multiple files, the SparkContext's method wholeTextFiles provides the functionality we need. It reads each file as a single record and returns it as a key-value pair, where the key contains the location of the file and the value holds the file content. We can reference input files directly via the wildcard pattern data/subject*. This is not only useful during loading files from a local filesystem but especially important for loading files from HDFS as well.
val path = s"${sys.env.get("DATADIR").getOrElse("data")}/subject*"
val dataFiles = sc.wholeTextFiles(path)
println(s"Number of input files: ${dataFiles.count}")
Since the names are not part of the input data, we define a variable that is going to hold the column names:
val allColumnNames = Array( "timestamp", "activityId", "hr") ++ Array( "hand", "chest", "ankle").flatMap(sensor => Array( "temp", "accel1X", "accel1Y", "accel1Z", "accel2X", "accel2Y", "accel2Z", "gyroX", "gyroY", "gyroZ", "magnetX", "magnetY", "magnetZ", "orientX", "orientY", "orientZ"). map(name => s"${sensor}_${name}"))
We simply defined the first three column names, and then column names for each of three position sensors. Furthermore, we also prepared a list of column indexes which are useless for modeling, including timestamp and orientation data:
val ignoredColumns = Array(0, 3 + 13, 3 + 14, 3 + 15, 3 + 16, 20 + 13, 20 + 14, 20 + 15, 20 + 16, 37 + 13, 37 + 14, 37 + 15, 37 + 16)
The next step is to process the content of the referenced files and create an RDD which we use as input for data exploration and modeling. Since we are expecting to iterate over the data several times and perform different transformations, we are going to cache the data in memory:
val rawData = dataFiles.flatMap { case (path, content) => content.split(" ") }.map { row => row.split(" ").map(_.trim). zipWithIndex. map(v => if (v.toUpperCase == "NAN") Double.NaN else v.toDouble). collect { case (cell, idx) if !ignoredColumns.contains(idx) => cell } } rawData.cache() println(s"Number of rows: ${rawData.count}")
The output is as follows:
In this case, for each key-value pair we extract its content and split it based on line boundaries. Then we transform each line based on the file delimiter, which is a space between features. Since the files contains only numeric values and the string value NaN as a marker for missing values, we can simply transform all values into Java's Double, leaving Double.NaN as a representation for a missing value.
We can see our input file has 977,972 rows. During loading, we also skipped the timestamp column and columns which were marked as invalid in the dataset description (see the ignoredColumns array).
To keep our view of the dataset consistent, we also need to filter column names based on the list of ignored columns which was prepared in previous steps:
import org.apache.spark.utils.Tabulizer._
val columnNames = allColumnNames.
zipWithIndex.
filter { case (_, idx) => !ignoredColumns.contains(idx) }.
map { case (name, _) => name }
println(s"Column names:${table(columnNames, 4, None)}")
The output is as follows:
From an intuitive point of view also, modelling ID terms, for example, doesn't make a lot of sense given the nature of the field. Feature selection is a hugely important topic and one that we will spend a great deal of time on later in the book.
Now let's look at the distribution of the individual activities in our dataset. We are going to use the same trick as in the previous chapter; however, we also would like to see actual names of activities instead of pure number-based representation. Hence, at first we define mapping describing a relation between an activity number and its name:
val activities = Map( 1 -> "lying", 2 -> "sitting", 3 -> "standing", 4 -> "walking", 5 -> "running", 6 -> "cycling", 7 -> "Nordic walking", 9 -> "watching TV", 10 -> "computer work", 11 -> "car driving", 12 -> "ascending stairs", 13 -> "descending stairs", 16 -> "vacuum cleaning", 17 -> "ironing", 18 -> "folding laundry", 19 -> "house cleaning", 20 -> "playing soccer", 24 -> "rope jumping", 0 -> "other")
Then we compute the number of individual activities in the data with the help of the Spark method reduceByKey:
val dataActivityId = rawData.map(l => l(0).toInt)
val activityIdCounts = dataActivityId.
map(n => (n, 1)).
reduceByKey(_ + _)
val activityCounts = activityIdCounts.
collect.
sortBy { case (activityId, count) =>
-count
}.map { case (activityId, count) =>
(activitiesMap(activityId), count)
}
println(s"Activities distribution:${table({activityCounts})}")
The command computes the count of individual activities, translates the activity number to its label, and sorts the result in descending order based on counts:
Or visualized based on activity frequencies as shown in Figure 2.