Spark exercise - hands-on with Spark (Databricks)

This notebook is based on tutorials conducted by Databricks ( The tutorial will be conducted using the Databricks' Community Edition of Spark, available to sign up to at Databricks is a leading provider of the commercial and enterprise supported version of Spark.

In this tutorial, we will introduce a few basic commands used in Spark. Users are encouraged to try out more extensive Spark tutorials and notebooks that are available on the web for more detailed examples.

Documentation for Spark's Python API can be found at

The data for this book was imported into the Databricks' Spark Platform. For more information on importing data, go to Importing Data - Databricks (

# The SparkContext/SparkSession is the entry point for all Spark operations
# sc = the SparkContext = the execution environment of Spark, only 1 per JVM
# Note that SparkSession is now the entry point (from Spark v2.0)
# This tutorial uses SparkContext (was used prior to Spark 2.0)

from pyspark import SparkContext
# sc = SparkContext(appName = "some_application_name") # You'd normally run this, but in this case, it has already been created in the Databricks' environment

quote = "To be, or not to be, that is the question: Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune, Or to take Arms against a Sea of troubles, And by opposing end them: to die, to sleep No more; and by a sleep, to say we end the heart-ache, and the thousand natural shocks that Flesh is heir to? 'Tis a consummation devoutly to be wished. To die, to sleep, To sleep, perchance to Dream; aye, there's the rub, for in that sleep of death, what dreams may come, when we have shuffled off this mortal coil, must give us pause."

sparkdata = sc.parallelize(quote.split(' '))

print "sparkdata = ", sparkdata
print "sparkdata.collect = ", sparkdata.collect
print "sparkdata.collect() = ", sparkdata.collect()[1:10]

# A simple transformation - map
def mapword(word):
return (word,1)

print # Nothing has happened here
print[1:10] # collect causes the DAG to execute

# Another Transformation

def charsmorethan2(tuple1):
if len(tuple1[0])>2:
return tuple1

rdd3 = x: charsmorethan2(x))
# Multiple Transformations in 1 statement, nothing is happening yet
# The DAG gets executed. Note that since we didn't remove punctuation marks ... 'be,', etc are also included

# With Tables, a general example
cms = sc.parallelize([[1,"Dr. A",12.50,"Yale"],[2,"Dr. B",5.10,"Duke"],[3,"Dr. C",200.34,"Mt. Sinai"],[4,"Dr. D",5.67,"Duke"],[1,"Dr. E",52.50,"Yale"]])

def findPayment(data):
return data[2]

print "Payments = ",
print "Mean = ", # Mean is an action

# Creating a DataFrame (familiar to Python programmers)

cms_df = sqlContext.createDataFrame(cms, ["ID","Name","Payment","Hosp"])
print cms_df.groupby('Hosp').agg(func.avg('Payment'), func.max('Payment'),func.min('Payment'))
print cms_df.groupby('Hosp').agg(func.avg('Payment'), func.max('Payment'),func.min('Payment')).collect()
print "Converting to a Pandas DataFrame"
print "--------------------------------"
pd_df = cms_df.groupby('Hosp').agg(func.avg('Payment'), func.max('Payment'),func.min('Payment')).toPandas()
print type(pd_df)
print pd_df

wordsList = ['to','be','or','not','to','be']
wordsRDD = sc.parallelize(wordsList, 3) # Splits into 2 groups
# Print out the type of wordsRDD
print type(wordsRDD)

# Glom coallesces all elements within each partition into a list
print wordsRDD.glom().take(2) # Take is an action, here we are 'take'-ing the first 2 elements of the wordsRDD
print wordsRDD.glom().collect() # Collect

# An example with changing the case of words
# One way of completing the function
def makeUpperCase(word):
return word.upper()

print makeUpperCase('cat')

upperRDD =
print upperRDD.collect()

upperLambdaRDD = word: word.upper())
print upperLambdaRDD.collect()

# Pair RDDs
wordPairs = word: (word, 1))
print wordPairs.collect()

# #### Part 2: Counting with pair RDDs
# There are multiple ways of performing group-by operations in Spark
# One such method is groupByKey()
# ** Using groupByKey() **
# This method creates a key-value pair whereby each key (in this case word) is assigned a value of 1 for our wordcount operation. It then combines all keys into a single list. This can be quite memory intensive, especially if the dataset is large.

# Using groupByKey
wordsGrouped = wordPairs.groupByKey()
for key, value in wordsGrouped.collect():
print '{0}: {1}'.format(key, list(value))

# Summation of the key values (to get the word count)
wordCountsGrouped = (k,v): (k, sum(v)))
print wordCountsGrouped.collect()

# ** (2c) Counting using reduceByKey **
# reduceByKey creates a new pair RDD. It then iteratively applies a function first to each key (i.e., within the key values) and then across all the keys, i.e., in other words it applies the given function iteratively.

wordCounts = wordPairs.reduceByKey(lambda a,b: a+b)
print wordCounts.collect()

# ** Combining all of the above into a single statement **

wordCountsCollected = (wordsRDD
.map(lambda word: (word, 1))
.reduceByKey(lambda a,b: a+b)
print wordCountsCollected

# This tutorial has provided a basic overview of Spark and introduced the Databricks community edition where users can upload and execute their own Spark notebooks. There are various in-depth tutorials on the web and also at Databricks on Spark and users are encouraged to peruse them if interested in learning further about Spark.
