Python testing of Spark is very similar in concept, but the testing libraries are a bit different. PySpark uses both doctest
and unittest
to test itself. The doctest
library makes it easy to create tests based on the expected output of code run in the Python interpreter. We can run the tests by running pyspark -m doctest [pathtocode]
. By taking the wordcount.py
example from Spark and factoring out countWords
, you can test the word count functionality using doctest
:
""" >>> from pyspark.context import SparkContext >>> sc = SparkContext('local', 'test') >>> b = sc.parallelize(["pandas are awesome", "and ninjas are also awesome"]) >>> countWords(b) [('also', 1), ('and', 1), ('are', 2), ('awesome', 2), ('ninjas', 1), ('pandas', 1)] """ import sys from operator import add from pyspark import SparkContext def countWords(lines): counts = lines.flatMap(lambda x: x.split(' ')) .map(lambda x: (x, 1)) .reduceByKey(add) return sorted(counts.collect()) if __name__ == "__main__": if len(sys.argv) < 3: print >> sys.stderr, "Usage: PythonWordCount<master> <file>" exit(-1) sc = SparkContext(sys.argv[1], "PythonWordCount") lines = sc.textFile(sys.argv[2], 1) output = countWords(lines) for (word, count) in output: print "%s : %i" % (word, count)
We can also test something similar to our Java and Scala programs like so:
""" >>> from pyspark.context import SparkContext >>> sc = SparkContext('local', 'test') >>> b = sc.parallelize(["1,2","1,3"]) >>> handleInput(b) [3, 4] """ import sys from operator import add from pyspark import SparkContext def handleInput(lines): data = lines.map(lambda x: sum(map(int, x.split(',')))) return sorted(data.collect()) if __name__ == "__main__": if len(sys.argv) < 3: print >> sys.stderr, "Usage: PythonLoadCsv<master> <file>" exit(-1) sc = SparkContext(sys.argv[1], "PythonLoadCsv") lines = sc.textFile(sys.argv[2], 1) output = handleInput(lines) for sum in output: print sum
18.226.4.191