Python has slowly established ground as a de-facto tool for data science. It has a command-line interface and decent visualization via matplotlib and ggplot, which is based on R's ggplot2. Recently, Wes McKinney, the creator of Pandas, the time series data-analysis package, has joined Cloudera to pave way for Python in big data.
Python is usually part of the default installation. Spark requires version 2.7.0+.
If you don't have Python on Mac OS, I recommend installing the Homebrew package manager from http://brew.sh:
[akozlov@Alexanders-MacBook-Pro spark(master)]$ ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" ==> This script will install: /usr/local/bin/brew /usr/local/Library/... /usr/local/share/man/man1/brew.1 … [akozlov@Alexanders-MacBook-Pro spark(master)]$ brew install python …
Otherwise, on a Unix-like system, Python can be compiled from the source distribution:
$ export PYTHON_VERSION=2.7.11 $ wget -O - https://www.python.org/ftp/python/$PYTHON_VERSION/Python-$PYTHON_VERSION.tgz | tar xzvf - $ cd $HOME/Python-$PYTHON_VERSION $ ./configure--prefix=/usr/local --enable-unicode=ucs4--enable-shared LDFLAGS="-Wl,-rpath /usr/local/lib" $ make; sudo make altinstall $ sudo ln -sf /usr/local/bin/python2.7 /usr/local/bin/python
It is good practice to place it in a directory different from the default Python installation. It is normal to have multiple versions of Python on a single system, which usually does not lead to problems as Python separates the installation directories. For the purpose of this chapter, as for many machine learning takes, I'll also need a few packages. The packages and specific versions may differ across installations:
$ wget https://bootstrap.pypa.io/ez_setup.py $ sudo /usr/local/bin/python ez_setup.py $ sudo /usr/local/bin/easy_install-2.7 pip $ sudo /usr/local/bin/pip install --upgrade avro nose numpy scipy pandas statsmodels scikit-learn iso8601 python-dateutil python-snappy
If everything compiles—SciPy uses a Fortran compiler and libraries for linear algebra—we are ready to use Python 2.7.11!
As bin/sparkR
launches R with preloaded Spark context, bin/pyspark
launches Python shell with preloaded Spark context and Spark driver running. The
PYSPARK_PYTHON
environment variable can be used to point to a specific Python version:
[akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ export PYSPARK_PYTHON=/usr/local/bin/python [akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ bin/pyspark Python 2.7.11 (default, Jan 23 2016, 20:14:24) [GCC 4.2.1 Compatible Apple LLVM 7.0.2 (clang-700.1.81)] on darwin Type "help", "copyright", "credits" or "license" for more information. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_ version 1.6.1 /_/ Using Python version 2.7.11 (default, Jan 23 2016 20:14:24) SparkContext available as sc, HiveContext available as sqlContext. >>>
PySpark directly supports most of MLlib functionality on Spark RDDs (http://spark.apache.org/docs/latest/api/python), but it is known to lag a few releases behind the Scala API (http://spark.apache.org/docs/latest/api/python). As of the 1.6.0+ release, it also supports DataFrames (http://spark.apache.org/docs/latest/sql-programming-guide.html):
>>> sfoFlights = sqlContext.sql("SELECT Dest, UniqueCarrier, ArrDelayMinutes FROM parquet.parquet") >>> sfoFlights.groupBy(["Dest", "UniqueCarrier"]).agg(func.avg("ArrDelayMinutes"), func.count("ArrDelayMinutes")).sort("avg(ArrDelayMinutes)", ascending=False).head(5) [Row(Dest=u'HNL', UniqueCarrier=u'HA', avg(ArrDelayMinutes)=53.70967741935484, count(ArrDelayMinutes)=31), Row(Dest=u'IAH', UniqueCarrier=u'F9', avg(ArrDelayMinutes)=43.064516129032256, count(ArrDelayMinutes)=31), Row(Dest=u'LAX', UniqueCarrier=u'DL', avg(ArrDelayMinutes)=39.68691588785047, count(ArrDelayMinutes)=214), Row(Dest=u'LAX', UniqueCarrier=u'WN', avg(ArrDelayMinutes)=29.704453441295545, count(ArrDelayMinutes)=247), Row(Dest=u'MSO', UniqueCarrier=u'OO', avg(ArrDelayMinutes)=29.551724137931036, count(ArrDelayMinutes)=29)]
As this is really a book about Scala, we should also mention that one can call Python code and its interpreter directly from Scala (or Java). There are a few options available that will be discussed in this chapter.
Scala, as well as Java, can call OS processes via spawning a separate thread, which we already used for interactive analysis in Chapter 1, Exploratory Data Analysis: the .!
method will start the process and return the exit code, while .!!
will return the string that contains the output:
scala> import sys.process._ import sys.process._ scala> val retCode = Process(Seq("/usr/local/bin/python", "-c", "import socket; print(socket.gethostname())")).! Alexanders-MacBook-Pro.local retCode: Int = 0 scala> val lines = Process(Seq("/usr/local/bin/python", "-c", """from datetime import datetime, timedelta; print("Yesterday was {}".format(datetime.now()-timedelta(days=1)))""")).!! lines: String = "Yesterday was 2016-02-12 16:24:53.161853 "
Let's try a more complex SVD computation (similar to the one we used in SVD++ recommendation engine, but this time, it invokes BLAS C-libraries at the backend). I created a Python executable that takes a string representing a matrix and the required rank as an input and outputs an SVD approximation with the provided rank:
#!/usr/bin/env python import sys import os import re import numpy as np from scipy import linalg from scipy.linalg import svd np.set_printoptions(linewidth=10000) def process_line(input): inp = input.rstrip(" ") if len(inp) > 1: try: (mat, rank) = inp.split("|") a = np.matrix(mat) r = int(rank) except: a = np.matrix(inp) r = 1 U, s, Vh = linalg.svd(a, full_matrices=False) for i in xrange(r, s.size): s[i] = 0 S = linalg.diagsvd(s, s.size, s.size) print(str(np.dot(U, np.dot(S, Vh))).replace(os.linesep, ";")) if __name__ == '__main__': map(process_line, sys.stdin)
Let's call it svd.py
and put in in the current directory. Given a matrix and rank as an input, it produces an approximation of a given rank:
$ echo -e "1,2,3;2,1,2;3,2,1;7,8,9|3" | ./svd.py [[ 1. 2. 3.]; [ 2. 1. 2.]; [ 3. 2. 1.]; [ 7. 8. 9.]]
To call it from Scala, let's define the following #<<<
method in our DSL:
scala> implicit class RunCommand(command: String) { | def #<<< (input: String)(implicit buffer: StringBuilder) = { | val process = Process(command) | val io = new ProcessIO ( | in => { in.write(input getBytes "UTF-8"); in.close}, | out => { buffer append scala.io.Source.fromInputStream(out).getLines.mkString(" "); buffer.append(" "); out.close() }, | err => { scala.io.Source.fromInputStream(err).getLines().foreach(System.err.println) }) | (process run io).exitValue | } | } defined class RunCommand
Now, we can use the #<<<
operator to call Python's SVD method:
scala> implicit val buffer = new StringBuilder() buffer: StringBuilder = scala> if ("./svd.py" #<<< "1,2,3;2,1,2;3,2,1;7,8,9|1" == 0) Some(buffer.toString) else None res77: Option[String] = Some([[ 1.84716691 2.02576751 2.29557674]; [ 1.48971176 1.63375041 1.85134741]; [ 1.71759947 1.88367234 2.13455611]; [ 7.19431647 7.88992728 8.94077601]])
Note that as we requested the resulting matrix rank to be one, all rows and columns are linearly dependent. We can even pass several lines of input at a time, as follows:
scala> if ("./svd.py" #<<< """ | 1,2,3;2,1,2;3,2,1;7,8,9|0 | 1,2,3;2,1,2;3,2,1;7,8,9|1 | 1,2,3;2,1,2;3,2,1;7,8,9|2 | 1,2,3;2,1,2;3,2,1;7,8,9|3""" == 0) Some(buffer.toString) else None res80: Option[String] = Some([[ 0. 0. 0.]; [ 0. 0. 0.]; [ 0. 0. 0.]; [ 0. 0. 0.]] [[ 1.84716691 2.02576751 2.29557674]; [ 1.48971176 1.63375041 1.85134741]; [ 1.71759947 1.88367234 2.13455611]; [ 7.19431647 7.88992728 8.94077601]] [[ 0.9905897 2.02161614 2.98849663]; [ 1.72361156 1.63488399 1.66213642]; [ 3.04783513 1.89011928 1.05847477]; [ 7.04822694 7.88921926 9.05895373]] [[ 1. 2. 3.]; [ 2. 1. 2.]; [ 3. 2. 1.]; [ 7. 8. 9.]])
SVD decomposition is usually a pretty heavy operation, so the relative overhead of calling Python in this case is small. We can avoid this overhead if we keep the process running and supply several lines at a time, like we did in the last example. Both Hadoop MR and Spark implement this approach. For example, in Spark, the whole computation will take only one line, as shown in the following:
scala> sc.parallelize(List("1,2,3;2,1,2;3,2,1;7,8,9|0", "1,2,3;2,1,2;3,2,1;7,8,9|1", "1,2,3;2,1,2;3,2,1;7,8,9|2", "1,2,3;2,1,2;3,2,1;7,8,9|3"),4).pipe("./svd.py").collect.foreach(println) [[ 0. 0. 0.]; [ 0. 0. 0.]; [ 0. 0. 0.]; [ 0. 0. 0.]] [[ 1.84716691 2.02576751 2.29557674]; [ 1.48971176 1.63375041 1.85134741]; [ 1.71759947 1.88367234 2.13455611]; [ 7.19431647 7.88992728 8.94077601]] [[ 0.9905897 2.02161614 2.98849663]; [ 1.72361156 1.63488399 1.66213642]; [ 3.04783513 1.89011928 1.05847477]; [ 7.04822694 7.88921926 9.05895373]] [[ 1. 2. 3.]; [ 2. 1. 2.]; [ 3. 2. 1.]; [ 7. 8. 9.]]
The whole pipeline is ready to be distributed across a cluster of multicore workstations! I think you will be in love with Scala/Spark already.
Note that debugging the pipelined executions might be tricky as the data is passed from one process to another using OS pipes.
For completeness, we need to mention Jython, a Java implementation of Python (as opposed to a more familiar C implementation, also called CPython). Jython avoids the problem of passing input/output via OS pipelines by allowing the users to compile Python source code to Java byte codes, and running the resulting bytecodes on any Java virtual machine. As Scala also runs in Java virtual machine, it can use the Jython classes directly, although the reverse is not true in general; Scala classes sometimes are not compatible to be used by Java/Jython.
JSR 223
In this particular case, the request is for "Scripting for the JavaTM Platform" and was originally filed on Nov 15th 2004 (https://www.jcp.org/en/jsr/detail?id=223). At the beginning, it was targeted towards the ability of the Java servlet to work with multiple scripting languages. The specification requires the scripting language maintainers to provide a Java JAR with corresponding implementations. Portability issues hindered practical implementations, particularly when platforms require complex interaction with OS, such as dynamic linking in C or Fortran. Currently, only a handful languages are supported, with R and Python being supported, but in incomplete form.
Since Java 6, JSR 223: Scripting for Java added the javax.script
package that allows multiple scripting languages to be called through the same API as long as the language provides a script engine. To add the Jython scripting language, download the latest Jython JAR from the Jython site at http://www.jython.org/downloads.html:
$ wget -O jython-standalone-2.7.0.jar http://search.maven.org/remotecontent?filepath=org/python/jython-standalone/2.7.0/jython-standalone-2.7.0.jar [akozlov@Alexanders-MacBook-Pro Scala]$ scala -cp jython-standalone-2.7.0.jar Welcome to Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40). Type in expressions to have them evaluated. Type :help for more information. scala> import javax.script.ScriptEngine; ... scala> import javax.script.ScriptEngineManager; ... scala> import javax.script.ScriptException; ... scala> val manager = new ScriptEngineManager(); manager: javax.script.ScriptEngineManager = javax.script.ScriptEngineManager@3a03464 scala> val engines = manager.getEngineFactories(); engines: java.util.List[javax.script.ScriptEngineFactory] = [org.python.jsr223.PyScriptEngineFactory@4909b8da, jdk.nashorn.api.scripting.NashornScriptEngineFactory@68837a77, scala.tools.nsc.interpreter.IMain$Factory@1324409e]
Now, I can use the Jython/Python scripting engine:
scala> val engine = new ScriptEngineManager().getEngineByName("jython"); engine: javax.script.ScriptEngine = org.python.jsr223.PyScriptEngine@6094de13 scala> engine.eval("from datetime import datetime, timedelta; yesterday = str(datetime.now()-timedelta(days=1))") res15: Object = null scala> engine.get("yesterday") res16: Object = 2016-02-12 23:26:38.012000
It is worth giving a disclaimer here that not all Python modules are available in Jython. Modules that require a C/Fortran dynamic linkage for the library that doesn't exist in Java are not likely to work in Jython. Specifically, NumPy and SciPy are not supported in Jython as they rely on C/Fortran. If you discover some other missing modules, you can try copying the .py
file from a Python distribution to a sys.path
Jython directory—if this works, consider yourself lucky.
Jython has the advantage of accessing Python-rich modules without the necessity of starting the Python runtime on each call, which might result in a significant performance saving:
scala> val startTime = System.nanoTime startTime: Long = 54384084381087 scala> for (i <- 1 to 100) { | engine.eval("from datetime import datetime, timedelta; yesterday = str(datetime.now()-timedelta(days=1))") | val yesterday = engine.get("yesterday") | } scala> val elapsed = 1e-9 * (System.nanoTime - startTime) elapsed: Double = 0.270837934 scala> val startTime = System.nanoTime startTime: Long = 54391560460133 scala> for (i <- 1 to 100) { | val yesterday = Process(Seq("/usr/local/bin/python", "-c", """from datetime import datetime, timedelta; print(datetime.now()-timedelta(days=1))""")).!! | } scala> val elapsed = 1e-9 * (System.nanoTime - startTime) elapsed: Double = 2.221937263
Jython JSR 223 call is 10 times faster!
3.145.82.191