While Spark provides MLlib as a library for machine learning, in many practical situations, R or Python present a more familiar and time-tested interface for statistical computations. In particular, R's extensive statistical library includes very popular ANOVA/MANOVA methods of analyzing variance and variable dependencies/independencies, sets of statistical tests, and random number generators that are not currently present in MLlib. The interface from R to Spark is available under SparkR project. Finally, data analysts know Python's NumPy and SciPy linear algebra implementations for their efficiency as well as other time-series, optimization, and signal processing packages. With R/Python integration, all these familiar functionalities can be exposed to Scala/Spark users until the Spark/MLlib interfaces stabilize and the libraries make their way into the new framework while benefiting the users with Spark's ability to execute workflows in a distributed way across multiple machines.
When people program in R or Python, or with any statistical or linear algebra packages for this matter, they are usually not specifically focusing on the functional programming aspects. As I mentioned in Chapter 1, Exploratory Data Analysis, Scala should be treated as a high-level language and this is where it shines. Integration with highly efficient C and Fortran implementations, for example, of the freely available Basic Linear Algebra Subprograms (BLAS), Linear Algebra Package (LAPACK), and Arnoldi Package (ARPACK), is known to find its way into Java and thus Scala (http://www.netlib.org, https://github.com/fommil/netlib-java). I would like to leave Scala at what it's doing best. In this chapter, however, I will focus on how to use these languages with Scala/Spark.
I will use the publicly available United States Department of Transportation flights dataset for this chapter (http://www.transtats.bts.gov).
In this chapter, we will cover the following topics:
As with many advanced and carefully designed technologies, people usually either love or hate R as a language. One of the reason being that R was one of the first language implementations that tries to manipulate complex objects, even though most of them turn out to be just a list as opposed to struct or map as in more mature modern implementations. R was originally created at the University of Auckland by Ross Ihaka and Robert Gentleman around 1993, and had its roots in the S language developed at Bell Labs around 1976, when most of the commercial programming was still done in Fortran. While R incorporates some functional features such as passing functions as a parameter and map/apply, it conspicuously misses some others such as lazy evaluation and list comprehensions. With all this said, R has a very good help system, and if someone says that they never had to go back to the help(…)
command to figure out how to run a certain data transformation or model better, they are either lying or just starting in R.
To run SparkR, you'll need R version 3.0 or later. Follow the given instructions for the installation, depending on you operating system.
On a Linux system, detailed installation documentation is available at https://cran.r-project.org/bin/linux. However, for example, on a Debian system, one installs it by running the following command:
# apt-get update ... # apt-get install r-base r-base-dev ...
To list installed/available packages on the Linux repository site, perform the following command:
# apt-cache search "^r-.*" | sort ...
R packages, which are a part of r-base
and r-recommended
, are installed into the /usr/lib/R/library
directory. These can be updated using the usual package maintenance tools such as apt-get
or aptitude. The other R packages available as precompiled Debian packages, r-cran-*
and r-bioc-*
, are installed into /usr/lib/R/site-library
. The following command shows all packages that depend on r-base-core
:
# apt-cache rdepends r-base-core
This comprises of a large number of contributed packages from CRAN and other repositories. If you want to install R packages that are not provided as package, or if you want to use newer versions, you need to build them from source that requires the r-base-dev
development package that can be installed by the following command:
# apt-get install r-base-dev
This pulls in the basic requirements to compile R packages, such as the development tools group install. R packages may then be installed by the local user/admin from the CRAN source packages, typically from inside R using the R> install.packages()
function or R CMD INSTALL
. For example, to install the R ggplot2
package, run the following command:
> install.packages("ggplot2") --- Please select a CRAN mirror for use in this session --- also installing the dependencies 'stringi', 'magrittr', 'colorspace', 'Rcpp', 'stringr', 'RColorBrewer', 'dichromat', 'munsell', 'labeling', 'digest', 'gtable', 'plyr', 'reshape2', 'scales'
This will download and optionally compile the package and its dependencies from one of the available sites. Sometime R is confused about the repositories; in this case, I recommend creating a ~/.Rprofile
file in the home directory pointing to the closest CRAN repository:
$ cat >> ~/.Rprofile << EOF r = getOption("repos") # hard code the Berkeley repo for CRAN r["CRAN"] = "http://cran.cnr.berkeley.edu" options(repos = r) rm(r) EOF
~/.Rprofile
contains commands to customize your sessions. One of the commands I recommend to put in there is options (prompt="R> ")
to be able to distinguish the shell you are working in by the prompt, following the tradition of most tools in this book. The list of known mirrors is available at https://cran.r-project.org/mirrors.html.
Also, it is good practice to specify the directory to install system/site/user
packages via the following command, unless your OS setup does it already by putting these commands into ~/.bashrc
or system /etc/profile
:
$ export R_LIBS_SITE=${R_LIBS_SITE:-/usr/local/lib/R/site-library:/usr/lib/R/site-library:/usr/lib/R/library} $ export R_LIBS_USER=${R_LIBS_USER:-$HOME/R/$(uname -i)-library/$( R --version | grep -o -E [0-9]+.[ 0-9]+ | head -1)}
R for Mac OS can be downloaded, for example, from http://cran.r-project.org/bin/macosx. The latest version at the time of the writing is 3.2.3. Always check the consistency of the downloaded package. To do so, run the following command:
$ pkgutil --check-signature R-3.2.3.pkg Package "R-3.2.3.pkg": Status: signed by a certificate trusted by Mac OS X Certificate Chain: 1. Developer ID Installer: Simon Urbanek SHA1 fingerprint: B7 EB 39 5E 03 CF 1E 20 D1 A6 2E 9F D3 17 90 26 D8 D6 3B EF ----------------------------------------------------------------------------- 2. Developer ID Certification Authority SHA1 fingerprint: 3B 16 6C 3B 7D C4 B7 51 C9 FE 2A FA B9 13 56 41 E3 88 E1 86 ----------------------------------------------------------------------------- 3. Apple Root CA SHA1 fingerprint: 61 1E 5B 66 2C 59 3A 08 FF 58 D1 4A E2 24 52 D1 98 DF 6C 60
The environment settings in the preceding subsection also apply to the Mac OS setup.
R for Windows can be downloaded from https://cran.r-project.org/bin/windows/ as an exe installer. Run this executable as an administrator to install R.
One can usually edit the environment setting for System/User by following the Control Panel | System and Security | System | Advanced system settings | Environment Variables path from the Windows menu.
To run SparkR, one needs to run install the R/install-dev.sh
script that comes with the Spark git tree. In fact, one only needs the shell script and the content of the R/pkg
directory, which is not always included with the compiled Spark distributions:
$ git clone https://github.com/apache/spark.git Cloning into 'spark'... remote: Counting objects: 301864, done. ... $ cp –r R/{install-dev.sh,pkg) $SPARK_HOME/R ... $ cd $SPARK_HOME $ ./R/install-dev.sh * installing *source* package 'SparkR' ... ** R ** inst ** preparing package for lazy loading Creating a new generic function for 'colnames' in package 'SparkR' ... $ bin/sparkR R version 3.2.3 (2015-12-10) -- "Wooden Christmas-Tree" Copyright (C) 2015 The R Foundation for Statistical Computing Platform: x86_64-redhat-linux-gnu (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. Launching java with spark-submit command /home/alex/spark-1.6.1-bin-hadoop2.6/bin/spark-submit "sparkr-shell" /tmp/RtmpgdTfmU/backend_port22446d0391e8 Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1 /_/ Spark context is available as sc, SQL context is available as sqlContext>
Alternatively, we can also initialize Spark from the R command line directly (or from RStudio at http://rstudio.org/) using the following commands:
R> library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"))) ... R> sc <- sparkR.init(master = Sys.getenv("SPARK_MASTER"), sparkEnvir = list(spark.driver.memory="1g")) ... R> sqlContext <- sparkRSQL.init(sc)
As described previously in Chapter 3, Working with Spark and MLlib, the SPARK_HOME
environment variable needs to point to your local Spark installation directory and SPARK_MASTER
and YARN_CONF_DIR
to the desired cluster manager (local, standalone, mesos, and YARN) and YARN configuration directory if one is using Spark with the YARN cluster manager.
Although most all of the distributions come with a UI, in the tradition of this book and for the purpose of this chapter I'll use the command line.
The DataFrames originally came from R and Python, so it is natural to see them in SparkR.
The question of when and where to store and apply the schema and other metadata like types has been a topic of active debate recently. On one hand, providing the schema early with the data enables thorough data validation and potentially optimization. On the other hand, it may be too restrictive for the original data ingest, whose goal is just to capture as much data as possible and perform data formatting/cleansing later on, the approach often referred as schema on read. The latter approach recently won more ground with the tools to work with evolving schemas such as Avro and automatic schema discovery tools, but for the purpose of this chapter, I'll assume that we have done the schema discovery part and can start working with a DataFrames.
Let's first download and extract a flight delay dataset from the United States Department of Transportation, as follows:
$ wget http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_2015_7.zip --2016-01-23 15:40:02-- http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_2015_7.zip Resolving www.transtats.bts.gov... 204.68.194.70 Connecting to www.transtats.bts.gov|204.68.194.70|:80... connected. HTTP request sent, awaiting response... 200 OK Length: 26204213 (25M) [application/x-zip-compressed] Saving to: "On_Time_On_Time_Performance_2015_7.zip" 100%[====================================================================================================================================================================================>] 26,204,213 966K/s in 27s 2016-01-23 15:40:29 (956 KB/s) - "On_Time_On_Time_Performance_2015_7.zip" saved [26204213/26204213] $ unzip -d flights On_Time_On_Time_Performance_2015_7.zip Archive: On_Time_On_Time_Performance_2015_7.zip inflating: flights/On_Time_On_Time_Performance_2015_7.csv inflating: flights/readme.html
If you have Spark running on the cluster, you want to copy the file in HDFS:
$ hadoop fs –put flights .
The flights/readme.html
files gives you detailed metadata information, as shown in the following image:
Now, I want you to analyze the delays of SFO
returning flights and possibly find the factors contributing to the delay. Let's start with the R data.frame
:
$ bin/sparkR --master local[8] R version 3.2.3 (2015-12-10) -- "Wooden Christmas-Tree" Copyright (C) 2015 The R Foundation for Statistical Computing Platform: x86_64-apple-darwin13.4.0 (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. [Previously saved workspace restored] Launching java with spark-submit command /Users/akozlov/spark-1.6.1-bin-hadoop2.6/bin/spark-submit "--master" "local[8]" "sparkr-shell" /var/folders/p1/y7ygx_4507q34vhd60q115p80000gn/T//RtmpD42eTz/backend_port682e58e2c5db Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1 /_/ Spark context is available as sc, SQL context is available as sqlContext > flights <- read.table(unz("On_Time_On_Time_Performance_2015_7.zip", "On_Time_On_Time_Performance_2015_7.csv"), nrows=1000000, header=T, quote=""", sep=",") > sfoFlights <- flights[flights$Dest == "SFO", ] > attach(sfoFlights) > delays <- aggregate(ArrDelayMinutes ~ DayOfWeek + Origin + UniqueCarrier, FUN=mean, na.rm=TRUE) > tail(delays[order(delays$ArrDelayMinutes), ]) DayOfWeek Origin UniqueCarrier ArrDelayMinutes 220 4 ABQ OO 67.60 489 4 TUS OO 71.80 186 5 IAH F9 77.60 696 3 RNO UA 79.50 491 6 TUS OO 168.25 84 7 SLC AS 203.25
If you were flying from Salt Lake City on Sunday with Alaska Airlines in July 2015, consider yourself unlucky (we have only done simple analysis so far, so one shouldn't attach too much significance to this result). There may be multiple other random factors contributing to the delay.
Even though we ran the example in SparkR, we still used the R data.frame
. If we want to analyze data across multiple months, we will need to distribute the load across multiple nodes. This is where the SparkR distributed DataFrame comes into play, as it can be distributed across multiple threads even on a single node. There is a direct way to convert the R DataFrame to SparkR DataFrame (and thus to RDD):
> sparkDf <- createDataFrame(sqlContext, flights)
If I run it on a laptop, I will run out of memory. The overhead is large due to the fact that I need to transfer the data between multiple threads/nodes, we want to filter it as soon as possible:
sparkDf <- createDataFrame(sqlContext, subset(flights, select = c("ArrDelayMinutes", "DayOfWeek", "Origin", "Dest", "UniqueCarrier")))
This will run even on my laptop. There is, of course, a reverse conversion from Spark's DataFrame to R's data.frame
:
> rDf <- as.data.frame(sparkDf)
Alternatively, I can use the spark-csv
package to read it from the .csv
file, which, if the original .csv
file is in a distributed filesystem such as HDFS, will avoid shuffling the data over network in a cluster setting. The only drawback, at least currently, is that Spark cannot read from the .zip
files directly:
> $ ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.3.0 --master local[8] R version 3.2.3 (2015-12-10) -- "Wooden Christmas-Tree" Copyright (C) 2015 The R Foundation for Statistical Computing Platform: x86_64-redhat-linux-gnu (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. Warning: namespace 'SparkR' is not available and has been replaced by .GlobalEnv when processing object 'sparkDf' [Previously saved workspace restored] Launching java with spark-submit command /home/alex/spark-1.6.1-bin-hadoop2.6/bin/spark-submit "--master" "local[8]" "--packages" "com.databricks:spark-csv_2.10:1.3.0" "sparkr-shell" /tmp/RtmpfhcUXX/backend_port1b066bea5a03 Ivy Default Cache set to: /home/alex/.ivy2/cache The jars for the packages stored in: /home/alex/.ivy2/jars :: loading settings :: url = jar:file:/home/alex/spark-1.6.1-bin-hadoop2.6/lib/spark-assembly-1.6.1-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.databricks#spark-csv_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.databricks#spark-csv_2.10;1.3.0 in central found org.apache.commons#commons-csv;1.1 in central found com.univocity#univocity-parsers;1.5.1 in central :: resolution report :: resolve 189ms :: artifacts dl 4ms :: modules in use: com.databricks#spark-csv_2.10;1.3.0 from central in [default] com.univocity#univocity-parsers;1.5.1 from central in [default] org.apache.commons#commons-csv;1.1 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 3 | 0 | 0 | 0 || 3 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 3 already retrieved (0kB/7ms) Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1 /_/ Spark context is available as sc, SQL context is available as sqlContext > sparkDf <- read.df(sqlContext, "./flights", "com.databricks.spark.csv", header="true", inferSchema = "false") > sfoFlights <- select(filter(sparkDf, sparkDf$Dest == "SFO"), "DayOfWeek", "Origin", "UniqueCarrier", "ArrDelayMinutes") > aggs <- agg(group_by(sfoFlights, "DayOfWeek", "Origin", "UniqueCarrier"), count(sparkDf$ArrDelayMinutes), avg(sparkDf$ArrDelayMinutes)) > head(arrange(aggs, c('avg(ArrDelayMinutes)'), decreasing = TRUE), 10) DayOfWeek Origin UniqueCarrier count(ArrDelayMinutes) avg(ArrDelayMinutes) 1 7 SLC AS 4 203.25 2 6 TUS OO 4 168.25 3 3 RNO UA 8 79.50 4 5 IAH F9 5 77.60 5 4 TUS OO 5 71.80 6 4 ABQ OO 5 67.60 7 2 ABQ OO 4 66.25 8 1 IAH F9 4 61.25 9 4 DAL WN 5 59.20 10 3 SUN OO 5 59.00
Note that we loaded the additional com.databricks:spark-csv_2.10:1.3.0
package by supplying the --package
flag on the command line; we can easily go distributed by using a Spark instance over a cluster of nodes or even analyze a larger dataset:
$ for i in $(seq 1 6); do wget http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_2015_$i.zip; unzip -d flights On_Time_On_Time_Performance_2015_$i.zip; hadoop fs -put -f flights/On_Time_On_Time_Performance_2015_$i.csv flights; done $ hadoop fs -ls flights Found 7 items -rw-r--r-- 3 alex eng 211633432 2016-02-16 03:28 flights/On_Time_On_Time_Performance_2015_1.csv -rw-r--r-- 3 alex eng 192791767 2016-02-16 03:28 flights/On_Time_On_Time_Performance_2015_2.csv -rw-r--r-- 3 alex eng 227016932 2016-02-16 03:28 flights/On_Time_On_Time_Performance_2015_3.csv -rw-r--r-- 3 alex eng 218600030 2016-02-16 03:28 flights/On_Time_On_Time_Performance_2015_4.csv -rw-r--r-- 3 alex eng 224003544 2016-02-16 03:29 flights/On_Time_On_Time_Performance_2015_5.csv -rw-r--r-- 3 alex eng 227418780 2016-02-16 03:29 flights/On_Time_On_Time_Performance_2015_6.csv -rw-r--r-- 3 alex eng 235037955 2016-02-15 21:56 flights/On_Time_On_Time_Performance_2015_7.csv
This will download and put the on-time performance data in the flight's directory (remember, as we discussed in Chapter 1, Exploratory Data Analysis, we would like to treat directories as big data datasets). We can now run the same analysis over the whole period of 2015 (for the available data):
> sparkDf <- read.df(sqlContext, "./flights", "com.databricks.spark.csv", header="true") > sfoFlights <- select(filter(sparkDf, sparkDf$Dest == "SFO"), "DayOfWeek", "Origin", "UniqueCarrier", "ArrDelayMinutes") > aggs <- cache(agg(group_by(sfoFlights, "DayOfWeek", "Origin", "UniqueCarrier"), count(sparkDf$ArrDelayMinutes), avg(sparkDf$ArrDelayMinutes))) > head(arrange(aggs, c('avg(ArrDelayMinutes)'), decreasing = TRUE), 10) DayOfWeek Origin UniqueCarrier count(ArrDelayMinutes) avg(ArrDelayMinutes) 1 6 MSP UA 1 122.00000 2 3 RNO UA 8 79.50000 3 1 MSP UA 13 68.53846 4 7 SAT UA 1 65.00000 5 7 STL UA 9 64.55556 6 1 ORD F9 13 55.92308 7 1 MSO OO 4 50.00000 8 2 MSO OO 4 48.50000 9 5 CEC OO 28 45.86957 10 3 STL UA 13 43.46154
Note that we used a cache()
call to pin the dataset to the memory as we will use it again later. This time it's Minneapolis/United on Saturday! However, you probably already know why: there is only one record for this combination of DayOfWeek
, Origin
, and UniqueCarrier
; it's most likely an outlier. The average over about 30
flights for the previous outlier was reduced to 30
minutes:
> head(arrange(filter(filter(aggs, aggs$Origin == "SLC"), aggs$UniqueCarrier == "AS"), c('avg(ArrDelayMinutes)'), decreasing = TRUE), 100) DayOfWeek Origin UniqueCarrier count(ArrDelayMinutes) avg(ArrDelayMinutes) 1 7 SLC AS 30 32.600000 2 2 SLC AS 30 10.200000 3 4 SLC AS 31 9.774194 4 1 SLC AS 30 9.433333 5 3 SLC AS 30 5.866667 6 5 SLC AS 31 5.516129 7 6 SLC AS 30 2.133333
Sunday still remains a problem in terms of delays. The limit to the amount of data we can analyze now is only the number of cores on the laptop and nodes in the cluster. Let's look at more complex machine learning models now.
Linear methods play an important role in statistical modeling. As the name suggests, linear model assumes that the dependent variable is a weighted combination of independent variables. In R, the lm
function performs a linear regression and reports the coefficients, as follows:
R> attach(iris) R> lm(Sepal.Length ~ Sepal.Width) Call: lm(formula = Sepal.Length ~ Sepal.Width) Coefficients: (Intercept) Sepal.Width 6.5262 -0.2234
The summary
function provides even more information:
R> model <- lm(Sepal.Length ~ Sepal.Width + Petal.Length + Petal.Width) R> summary(model) Call: lm(formula = Sepal.Length ~ Sepal.Width + Petal.Length + Petal.Width) Residuals: Min 1Q Median 3Q Max -0.82816 -0.21989 0.01875 0.19709 0.84570 Coefficients: Estimate Std. Error t value Pr(>|t|) (Intercept) 1.85600 0.25078 7.401 9.85e-12 *** Sepal.Width 0.65084 0.06665 9.765 < 2e-16 *** Petal.Length 0.70913 0.05672 12.502 < 2e-16 *** Petal.Width -0.55648 0.12755 -4.363 2.41e-05 *** --- Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1 Residual standard error: 0.3145 on 146 degrees of freedom Multiple R-squared: 0.8586, Adjusted R-squared: 0.8557 F-statistic: 295.5 on 3 and 146 DF, p-value: < 2.2e-16
While we considered generalized linear models in Chapter 3, Working with Spark and MLlib, and we will also consider the glm
implementation in R and SparkR shortly, linear models provide more information in general and are an excellent tool for working with noisy data and selecting the relevant attribute for further analysis.
Data analysis life cycle
While most of the statistical books focus on the analysis and best use of available data, the results of statistical analysis in general should also affect the search for the new sources of information. In the complete data life cycle, discussed at the end of Chapter 3, Working with Spark and MLlib, a data scientist should always transform the latest variable importance results into the theories of how to collect data. For example, if the ink usage analysis for home printers points to an increase in ink usage for photos, one could potentially collect more information about the format of the pictures, sources of digital images, and paper the user prefers to use. This approach turned out to be very productive in a real business situation even though not fully automated.
Specifically, here is a short description of the output that linear models provide:
resid(model)
function.coef(model)
functions.Once the linear models are applied, usually more complex glm
or recursive models, such as decision trees and the rpart
function, are applied to find interesting variable interactions. Linear models are good for establishing baseline on the other models that can improve.
Finally, ANOVA is a standard technique to study the variance if the independent variables are discrete:
R> aov <- aov(Sepal.Length ~ Species) R> summary(aov) Df Sum Sq Mean Sq F value Pr(>F) Species 2 63.21 31.606 119.3 <2e-16 *** Residuals 147 38.96 0.265 --- Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
The measure of the model quality is F-statistics. While one can always run R algorithms with RDD using the pipe mechanism with Rscript
, I will partially cover this functionality with respect to Java Specification Request (JSR) 223 Python integration later. In this section, I would like to explore specifically a generalized linear regression glm
function that is implemented both in R and SparkR natively.
Once again, you can run either R glm
or SparkR glm
. The list of possible link and optimization functions for R implementation is provided in the following table:
The following list shows possible options for R glm
implementation:
Family |
Variance |
Link |
---|---|---|
gaussian |
gaussian |
identity |
binomial |
binomial |
logit, probit or cloglog |
poisson |
poisson |
log, identity or sqrt |
Gamma |
Gamma |
inverse, identity or log |
inverse.gaussian |
inverse.gaussian |
1/mu^2 |
quasi |
user-defined |
user-defined |
I will use a binary target, ArrDel15
, which indicates whether the plane was more than 15 minutes late for the arrival. The independent variables will be DepDel15
, DayOfWeek
, Month
, UniqueCarrier
, Origin
, and Dest
:
R> flights <- read.table(unz("On_Time_On_Time_Performance_2015_7.zip", "On_Time_On_Time_Performance_2015_7.csv"), nrows=1000000, header=T, quote=""", sep=",") R> flights$DoW_ <- factor(flights$DayOfWeek,levels=c(1,2,3,4,5,6,7), labels=c("Mon","Tue","Wed","Thu","Fri","Sat","Sun")) R> attach(flights) R> system.time(model <- glm(ArrDel15 ~ UniqueCarrier + DoW_ + Origin + Dest, flights, family="binomial"))
While you wait for the results, open another shell and run glm
in the SparkR
mode on the full seven months of data:
sparkR> cache(sparkDf <- read.df(sqlContext, "./flights", "com.databricks.spark.csv", header="true", inferSchema="true")) DataFrame[Year:int, Quarter:int, Month:int, DayofMonth:int, DayOfWeek:int, FlightDate:string, UniqueCarrier:string, AirlineID:int, Carrier:string, TailNum:string, FlightNum:int, OriginAirportID:int, OriginAirportSeqID:int, OriginCityMarketID:int, Origin:string, OriginCityName:string, OriginState:string, OriginStateFips:int, OriginStateName:string, OriginWac:int, DestAirportID:int, DestAirportSeqID:int, DestCityMarketID:int, Dest:string, DestCityName:string, DestState:string, DestStateFips:int, DestStateName:string, DestWac:int, CRSDepTime:int, DepTime:int, DepDelay:double, DepDelayMinutes:double, DepDel15:double, DepartureDelayGroups:int, DepTimeBlk:string, TaxiOut:double, WheelsOff:int, WheelsOn:int, TaxiIn:double, CRSArrTime:int, ArrTime:int, ArrDelay:double, ArrDelayMinutes:double, ArrDel15:double, ArrivalDelayGroups:int, ArrTimeBlk:string, Cancelled:double, CancellationCode:string, Diverted:double, CRSElapsedTime:double, ActualElapsedTime:double, AirTime:double, Flights:double, Distance:double, DistanceGroup:int, CarrierDelay:double, WeatherDelay:double, NASDelay:double, SecurityDelay:double, LateAircraftDelay:double, FirstDepTime:int, TotalAddGTime:double, LongestAddGTime:double, DivAirportLandings:int, DivReachedDest:double, DivActualElapsedTime:double, DivArrDelay:double, DivDistance:double, Div1Airport:string, Div1AirportID:int, Div1AirportSeqID:int, Div1WheelsOn:int, Div1TotalGTime:double, Div1LongestGTime:double, Div1WheelsOff:int, Div1TailNum:string, Div2Airport:string, Div2AirportID:int, Div2AirportSeqID:int, Div2WheelsOn:int, Div2TotalGTime:double, Div2LongestGTime:double, Div2WheelsOff:string, Div2TailNum:string, Div3Airport:string, Div3AirportID:string, Div3AirportSeqID:string, Div3WheelsOn:string, Div3TotalGTime:string, Div3LongestGTime:string, Div3WheelsOff:string, Div3TailNum:string, Div4Airport:string, Div4AirportID:string, Div4AirportSeqID:string, Div4WheelsOn:string, Div4TotalGTime:string, Div4LongestGTime:string, Div4WheelsOff:string, Div4TailNum:string, Div5Airport:string, Div5AirportID:string, Div5AirportSeqID:string, Div5WheelsOn:string, Div5TotalGTime:string, Div5LongestGTime:string, Div5WheelsOff:string, Div5TailNum:string, :string] sparkR> noNulls <- cache(dropna(selectExpr(filter(sparkDf, sparkDf$Cancelled == 0), "ArrDel15", "UniqueCarrier", "format_string('%d', DayOfWeek) as DayOfWeek", "Origin", "Dest"), "any")) sparkR> sparkModel = glm(ArrDel15 ~ UniqueCarrier + DayOfWeek + Origin + Dest, noNulls, family="binomial")
Here we try to build a model explaining delays as an effect of carrier, day of week, and origin on destination airports, which is captured by the formular construct ArrDel15 ~ UniqueCarrier + DayOfWeek + Origin + Dest
.
Nulls, big data, and Scala
Note that in the SparkR case of glm
, I had to explicitly filter out the non-cancelled flights and removed the NA—or nulls in the C/Java lingo. While R does this for you by default, NAs in big data are very common as the datasets are typically sparse and shouldn't be treated lightly. The fact that we have to deal with nulls explicitly in MLlib warns us about some additional information in the dataset and is definitely a welcome feature. The presence of an NA can carry information about the way the data was collected. Ideally, each NA should be accompanied by a small get_na_info
method as to why this particular value was not available or collected, which leads us to the Either
type in Scala.
Even though nulls are inherited from Java and a part of Scala, the Option
and Either
types are new and more robust mechanism to deal with special cases where nulls were traditionally used. Specifically, Either
can provide a value or exception message as to why it was not computed; while Option
can either provide a value or be None
, which can be readily captured by the Scala pattern-matching framework.
One thing you will notice is that SparkR will run multiple threads, and even on a single node, it will consume CPU time from multiple cores and returns much faster even with a larger size of data. In my experiment on a 32-core machine, it was able to finish in under a minute (as opposed to 35 minutes for R glm
). To get the results, as in the R model case, we need to run the summary()
method:
> summary(sparkModel) $coefficients Estimate (Intercept) -1.518542340 UniqueCarrier_WN 0.382722232 UniqueCarrier_DL -0.047997652 UniqueCarrier_OO 0.367031995 UniqueCarrier_AA 0.046737727 UniqueCarrier_EV 0.344539788 UniqueCarrier_UA 0.299290120 UniqueCarrier_US 0.069837542 UniqueCarrier_MQ 0.467597761 UniqueCarrier_B6 0.326240578 UniqueCarrier_AS -0.210762769 UniqueCarrier_NK 0.841185903 UniqueCarrier_F9 0.788720078 UniqueCarrier_HA -0.094638586 DayOfWeek_5 0.232234937 DayOfWeek_4 0.274016179 DayOfWeek_3 0.147645473 DayOfWeek_1 0.347349366 DayOfWeek_2 0.190157420 DayOfWeek_7 0.199774806 Origin_ATL -0.180512251 ...
The worst performer is NK
(Spirit Airlines). Internally, SparkR uses limited-memory BFGS, which is a limited-memory quasi-Newton optimization method that is similar to the results obtained with R glm
on the July data:
R> summary(model) Call: glm(formula = ArrDel15 ~ UniqueCarrier + DoW + Origin + Dest, family = "binomial", data = dow) Deviance Residuals: Min 1Q Median 3Q Max -1.4205 -0.7274 -0.6132 -0.4510 2.9414 Coefficients: Estimate Std. Error z value Pr(>|z|) (Intercept) -1.817e+00 2.402e-01 -7.563 3.95e-14 *** UniqueCarrierAS -3.296e-01 3.413e-02 -9.658 < 2e-16 *** UniqueCarrierB6 3.932e-01 2.358e-02 16.676 < 2e-16 *** UniqueCarrierDL -6.602e-02 1.850e-02 -3.568 0.000359 *** UniqueCarrierEV 3.174e-01 2.155e-02 14.728 < 2e-16 *** UniqueCarrierF9 6.754e-01 2.979e-02 22.668 < 2e-16 *** UniqueCarrierHA 7.883e-02 7.058e-02 1.117 0.264066 UniqueCarrierMQ 2.175e-01 2.393e-02 9.090 < 2e-16 *** UniqueCarrierNK 7.928e-01 2.702e-02 29.343 < 2e-16 *** UniqueCarrierOO 4.001e-01 2.019e-02 19.817 < 2e-16 *** UniqueCarrierUA 3.982e-01 1.827e-02 21.795 < 2e-16 *** UniqueCarrierVX 9.723e-02 3.690e-02 2.635 0.008423 ** UniqueCarrierWN 6.358e-01 1.700e-02 37.406 < 2e-16 *** dowTue 1.365e-01 1.313e-02 10.395 < 2e-16 *** dowWed 1.724e-01 1.242e-02 13.877 < 2e-16 *** dowThu 4.593e-02 1.256e-02 3.656 0.000256 *** dowFri -2.338e-01 1.311e-02 -17.837 < 2e-16 *** dowSat -2.413e-01 1.458e-02 -16.556 < 2e-16 *** dowSun -3.028e-01 1.408e-02 -21.511 < 2e-16 *** OriginABI -3.355e-01 2.554e-01 -1.314 0.188965 ...
Other parameters of SparkR glm
implementation are provided in the following table:
The following table shows a list of parameters for SparkR glm
implementation:
Schema on Read is one of the convenient features of big data. The DataFrame class has the ability to figure out the schema of a text file containing a JSON record per line:
[akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ cat examples/src/main/resources/people.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} [akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ bin/sparkR ... > people = read.json(sqlContext, "examples/src/main/resources/people.json") > dtypes(people) [[1]] [1] "age" "bigint" [[2]] [1] "name" "string" > schema(people) StructType |-name = "age", type = "LongType", nullable = TRUE |-name = "name", type = "StringType", nullable = TRUE > showDF(people) +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
As we mentioned in the previous chapter, the Parquet format is an efficient storage format, particularly for low cardinality columns. Parquet files can be read/written directly from R:
> write.parquet(sparkDf, "parquet")
You can see that the new Parquet file is 66 times smaller that the original zip file downloaded from the DoT:
[akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ ls –l On_Time_On_Time_Performance_2015_7.zip parquet/ flights/ -rw-r--r-- 1 akozlov staff 26204213 Sep 9 12:21 /Users/akozlov/spark/On_Time_On_Time_Performance_2015_7.zip flights/: total 459088 -rw-r--r-- 1 akozlov staff 235037955 Sep 9 12:20 On_Time_On_Time_Performance_2015_7.csv -rw-r--r-- 1 akozlov staff 12054 Sep 9 12:20 readme.html parquet/: total 848 -rw-r--r-- 1 akozlov staff 0 Jan 24 22:50 _SUCCESS -rw-r--r-- 1 akozlov staff 10000 Jan 24 22:50 _common_metadata -rw-r--r-- 1 akozlov staff 23498 Jan 24 22:50 _metadata -rw-r--r-- 1 akozlov staff 394418 Jan 24 22:50 part-r-00000-9e2d0004-c71f-4bf5-aafe-90822f9d7223.gz.parquet
Let's assume that one has an exceptional implementation of a numeric method in Scala that we want to call from R. One way of doing this would be to use the R system()
function that invokes /bin/sh
on Unix-like systems. However, the rscala
package is a more efficient way that starts a Scala interpreter and maintains communication over TCP/IP network connection.
Here, the Scala interpreter maintains the state (memoization) between the calls. Similarly, one can define functions, as follows:
R> scala <- scalaInterpreter() R> scala %~% 'def pri(i: Stream[Int]): Stream[Int] = i.head #:: pri(i.tail filter { x => { println("Evaluating " + x + "%" + i.head); x % i.head != 0 } } )' ScalaInterpreterReference... engine: javax.script.ScriptEngine R> scala %~% 'val primes = pri(Stream.from(2))' ScalaInterpreterReference... primes: Stream[Int] R> scala %~% 'primes take 5 foreach println' 2 Evaluating 3%2 3 Evaluating 4%2 Evaluating 5%2 Evaluating 5%3 5 Evaluating 6%2 Evaluating 7%2 Evaluating 7%3 Evaluating 7%5 7 Evaluating 8%2 Evaluating 9%2 Evaluating 9%3 Evaluating 10%2 Evaluating 11%2 Evaluating 11%3 Evaluating 11%5 Evaluating 11%7 11 R> scala %~% 'primes take 5 foreach println' 2 3 5 7 11 R> scala %~% 'primes take 7 foreach println' 2 3 5 7 11 Evaluating 12%2 Evaluating 13%2 Evaluating 13%3 Evaluating 13%5 Evaluating 13%7 Evaluating 13%11 13 Evaluating 14%2 Evaluating 15%2 Evaluating 15%3 Evaluating 16%2 Evaluating 17%2 Evaluating 17%3 Evaluating 17%5 Evaluating 17%7 Evaluating 17%11 Evaluating 17%13 17 R>
R from Scala can be invoked using the !
or !!
Scala operators and Rscript
command:
[akozlov@Alexanders-MacBook-Pro ~]$ cat << EOF > rdate.R > #!/usr/local/bin/Rscript > > write(date(), stdout()) > EOF [akozlov@Alexanders-MacBook-Pro ~]$ chmod a+x rdate.R [akozlov@Alexanders-MacBook-Pro ~]$ scala Welcome to Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40). Type in expressions to have them evaluated. Type :help for more information. scala> import sys.process._ import sys.process._ scala> val date = Process(Seq("./rdate.R")).!! date: String = "Wed Feb 24 02:20:09 2016 "
A more efficient way is to use the similar TCP/IP binary transport protocol to communicate with R with Rsclient/Rserve
(http://www.rforge.net/Rserve). To start Rserve
on a node that has R installed, perform the following action:
[akozlov@Alexanders-MacBook-Pro ~]$ wget http://www.rforge.net/Rserve/snapshot/Rserve_1.8-5.tar.gz [akozlov@Alexanders-MacBook-Pro ~]$ R CMD INSTALL Rserve_1.8-5.tar .gz ... [akozlov@Alexanders-MacBook-Pro ~]$ R CMD INSTALL Rserve_1.8-5.tar.gz [akozlov@Alexanders-MacBook-Pro ~]$ $ R -q CMD Rserve R version 3.2.3 (2015-12-10) -- "Wooden Christmas-Tree" Copyright (C) 2015 The R Foundation for Statistical Computing Platform: x86_64-apple-darwin13.4.0 (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. Rserv started in daemon mode.
By default, Rserv
opens a connection on localhost:6311
. The advantage of the binary network protocol is that it is platform-independent and multiple clients can communicate with the server. The clients can connect to Rserve
.
Note that, while passing the results as a binary object has its advantages, you have to be careful with the type mappings between R and Scala. Rserve
supports other clients, including Python, but I will also cover JSR 223-compliant scripting at the end of this chapter.
3.147.57.145