pyspark-notebook
Docker StackIn this section, you’ll create and run a Spark streaming application in which you’ll receive a stream of tweets on the topic(s) you specify and summarize the top-20 hashtags in a bar chart that updates every 10 seconds. For this purpose of this example, you’ll use the Jupyter Docker container from the first Spark example.
There are two parts to this example. First, using the techniques from the “Data Mining Twitter” chapter, you’ll create a script that streams tweets from Twitter. Then, we’ll use Spark streaming in a Jupyter Notebook to read the tweets and summarize the hashtags.
The two parts will communicate with one another via networking sockets—a low-level view of client/server networking in which a client app communicates with a server app over a network using techniques similar to file I/O. A program can read from a socket or write to a socket similarly to reading from a file or writing to a file. The socket represents one endpoint of a connection. In this case, the client will be a Spark application, and the server will be a script that receives streaming tweets and sends them to the Spark app.
For this example, you’ll install the Tweepy library into the Jupyter Docker container. Follow Section 17.6.2’s instructions for launching the container and installing Python libraries into it. Use the following command to install Tweepy:
pip install tweepy
The script starttweetstream.py
contains a modified version of the TweetListener
class from the “Data Mining Twitter” chapter. It streams the specified number of tweets and sends them to a socket on the local computer. When the tweet limit is reached, the script closes the socket. You’ve already used Twitter streaming, so we’ll focus only on what’s new. Ensure that the file keys.py
(in the ch17
folder’s SparkHashtagSummarizer subfolder) contains your Twitter credentials.
In this example, you’ll use JupyterLab’s Terminal window to execute starttweetstream.py
in one tab, then use a notebook to perform the Spark task in another tab. With the Jupyter pyspark-notebook
Docker container running, open
http://localhost:8888/lab
in your web browser. In JupyterLab, select File > New > Terminal
to open a new tab containing a Terminal
. This is a Linux-based command line. Typing the ls
command and pressing Enter lists the current folder’s contents. By default, you’ll see the container’s work
folder.
To execute starttweetstream.py
, you must first navigate to the SparkHashtagSummarizer
folder with the command48:
cd work/SparkHashtagSummarizer
You can now execute the script with the command of the form
ipython starttweetstream.py number_of_tweets search_terms
where number_of_tweets specifies the total number of tweets to process and search_terms one or more space-separated strings to use for filtering tweets. For example, the following command would stream 1000 tweets about football:
ipython starttweetstream.py 1000 football
At this point, the script will display "Waiting
for
connection"
and will wait until Spark connects to begin streaming the tweets.
starttweetstream.py
import
StatementsFor discussion purposes, we’ve divided starttweetstream.py
into pieces. First, we import the modules used in the script. The Python Standard Library’s socket
module provides the capabilities that enable Python apps to communicate via sockets.
1 # starttweetstream.py
2 """Script to get tweets on topic(s) specified as script argument(s)
3 and send tweet text to a socket for processing by Spark."""
4 import keys
5 import socket
6 import sys
7 import tweepy
8
TweetListener
Once again, you’ve seen most of the code in class TweetListener, so we focus only on what’s new here:
Method __init__
(lines 12–17) now receives a connection
parameter representing the socket and stores it in the self.connection
attribute. We use this socket to send the hashtags to the Spark application.
In method on_status
(lines 24–44), lines 27–32 extract the hashtags from the Tweepy Status
object, convert them to lowercase and create a space-separated string of the hashtags to send to Spark. The key statement is line 39:
self.connection.send(hashtags_string.encode('utf-8'))
which uses the connection
object’s send
method to send the tweet text to whatever application is reading from that socket. Method send
expects as its argument a sequence of bytes. The string method call encode('utf-8')
converts the string to bytes. Spark will automatically read the bytes and reconstruct the strings.
9 class TweetListener(tweepy.StreamListener):
10 """Handles incoming Tweet stream."""
11
12 def __init__(self, api, connection, limit=10000):
13 """Create instance variables for tracking number of tweets."""
14 self.connection = connection
15 self.tweet_count = 0
16 self.TWEET_LIMIT = limit # 10,000 by default
17 super().__init__(api) # call superclass's init
18
19 def on_connect(self):
20 """Called when your connection attempt is successful, enabling
21 you to perform appropriate application tasks at that point."""
22 print('Successfully connected to Twitter ')
23
24 def on_status(self, status):
25 """Called when Twitter pushes a new tweet to you."""
26 # get the hashtags
27 hashtags = []
28
29 for hashtag_dict in status.entities['hashtags']:
30 hashtags.append(hashtag_dict['text'].lower())
31
32 hashtags_string = ' '.join(hashtags) + ' '
33 print(f'Screen name: {status.user.screen_name}:')
34 print(f' Hashtags: {hashtags_string}')
35 self.tweet_count += 1 # track number of tweets processed
36
37 try:
38 # send requires bytes, so encode the string in utf-8 format
39 self.connection.send(hashtags_string.encode('utf-8'))
40 except Exception as e:
41 print(f'Error: {e}')
42
43 # if TWEET_LIMIT is reached, return False to terminate streaming
44 return self.tweet_count != self.TWEET_LIMIT
45
46 def on_error(self, status):
47 print(status)
48 return True
49
Lines 50–80 execute when you run the script. You’ve connected to Twitter to stream tweets previously, so here we discuss only what’s new in this example.
Line 51 gets the number of tweets to process by converting the command-line argument sys.argv[1
] to an integer. Recall that element 0
represents the script’s name.
50 if __name__ == '__main__':
51 tweet_limit = int(sys.argv[1]) # get maximum number of tweets
Line 52 calls the socket
module’s socket
function, which returns a socket object that we’ll use to wait for a connection from the Spark application.
52 client_socket = socket.socket() # create a socket
53
Line 55 calls the socket object’s bind
method with a tuple containing the hostname or IP address of the computer and the port number on that computer. Together these represent where this script will wait for an initial connection from another app:
54 # app will use localhost (this computer) port 9876
55 client_socket.bind(('localhost', 9876))
56
Line 58 calls the socket’s listen
method, which causes the script to wait until a connection is received. This is the statement that prevents the Twitter stream from starting until the Spark application connects.
57 print('Waiting for connection')
58 client_socket.listen() # wait for client to connect
59
Once the Spark application connects, line 61 calls socket method accept
, which accepts the connection. This method returns a tuple containing a new socket object that the script will use to communicate with the Spark application and the IP address of the Spark application’s computer.
60 # when connection received, get connection/client address
61 connection, address = client_socket.accept()
62 print(f'Connection received from {address}')
63
Next, we authenticate with Twitter and start the stream. Lines 73–74 set up the stream, passing the socket object connection
to the TweetListener
so that it can use the socket to send hashtags to the Spark application.
64 # configure Twitter access
65 auth = tweepy.OAuthHandler(keys.consumer_key, keys.consumer_secret)
66 auth.set_access_token(keys.access_token, keys.access_token_secret)
67
68 # configure Tweepy to wait if Twitter rate limits are reached
69 api = tweepy.API(auth, wait_on_rate_limit=True,
70 wait_on_rate_limit_notify=True)
71
72 # create the Stream
73 twitter_stream = tweepy.Stream(api.auth,
74 TweetListener(api, connection, tweet_limit))
75
76 # sys.argv[2] is the first search term
77 twitter_stream.filter(track=sys.argv[2:])
78
Finally, lines 79–80 call the close
method on the socket objects to release their resources.
79 connection.close()
80 client_socket.close()
In this section, you’ll use Spark streaming to read the hashtags sent via a socket by the script starttweetstream.py
and summarize the results. You can either create a new notebook and enter the code you see here or load the hashtagsummarizer.ipynb
notebook we provide in the ch17
examples folder’s SparkHashtagSummarizer
subfolder.
First, let’s import the libraries used in this notebook. We’ll explain the pyspark classes as we use them. From IPython
, we imported the display
module, which contains classes and utility functions that you can use in Jupyter. In particular, we’ll use the clear_output
function to remove an existing chart before displaying a new one:
[1]: from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
This Spark application summarizes hashtags in 10-second batches. After processing each batch, it displays a Seaborn barplot. The IPython magic
%matplotlib inline
indicates that Matplotlib-based graphics should be displayed in the notebook rather than in their own windows. Recall that Seaborn uses Matplotlib.
We’ve used several IPython magics throughout the book. There are many magics specifically for use in Jupyter Notebooks. For the complete list of magics see:
https://ipython.readthedocs.io/en/stable/interactive/magics.html
SparkSession
As you’ll soon see, you can use Spark SQL to query data in resilient distributed datasets (RDDs). Spark SQL uses a Spark DataFrame
to get a table view of the underlying RDDs. A SparkSession
(module pyspark.sql
) is used to create a DataFrame
from an RDD.
There can be only one SparkSession
object per Spark application. The following function, which we borrowed from the Spark Streaming Programming Guide,49 defines the correct way to get a SparkSession
instance if it already exists or to create one if it does not yet exist:50
[2]: def getSparkSessionInstance(sparkConf):
"""Spark Streaming Programming Guide's recommended method
for getting an existing SparkSession or creating a new one."""
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession
.builder
.config(conf=sparkConf)
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
We call function display_barplot
after Spark processes each batch of hashtags. Each call clears the previous Seaborn barplot, then displays a new one based on the Spark DataFrame
it receives. First, we call the Spark DataFrame
’s toPandas
method to convert it to a pandas DataFrame
for use with Seaborn. Next, we call the clear_output
function from the IPython
.display
module. The keyword argument wait=True
indicates that the function should remove the prior graph (if there is one), but only once the new graph is ready to display. The rest of the code in the function uses standard Seaborn techniques we’ve shown previously. The function call sns.color_palette('cool',
20)
selects twenty colors from the Matplotlib 'cool'
color palette:
[3]: def display_barplot(spark_df, x, y, time, scale=2.0, size=(16, 9)):"""Displays a Spark DataFrame's contents as a bar plot."""
df = spark_df.toPandas()
# remove prior graph when new one is ready to display
display.clear_output(wait=True)
print(f'TIME: {time}')
# create and configure a Figure containing a Seaborn barplot
plt.figure(figsize=size)
sns.set(font_scale=scale)
barplot = sns.barplot(data=df, x=x, y=y
palette=sns.color_palette('cool', 20))
# rotate the x-axis labels 90 degrees for readability
for item in barplot.get_xticklabels():
item.set_rotation(90)
plt.tight_layout()
plt.show()
In Spark streaming, a DStream
is a sequence of RDDs each representing a mini-batch of data to process. As you’ll soon see, you can specify a function that is called to perform a task for every RDD in the stream. In this app, the function count_tags
will summarize the hashtag counts in a given RDD, add them to the current totals (maintained by the SparkSession
), then display an updated top-20 barplot so that we can see how the top-20 hashtags are changing over time.51 For discussion purposes, we’ve broken this function into smaller pieces. First, we get the SparkSession
by calling the utility function getSparkSessionInstance with the SparkContext
’s configuration information. Every RDD has access to the SparkContext
via the context
attribute:
[4]: def count_tags(time, rdd):
"""Count hashtags and display top-20 in descending order."""
try:
# get SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
Next, we call the RDD’s map
method to map the data in the RDD to Row
objects (from the pyspark.sql
package). The RDDs in this example contain tuples of hashtags and counts. The Row
constructor uses the names of its keyword arguments to specify the column names for each value in that row. In this case, tag[0]
is the hashtag in the tuple, and tag[1]
is the total count for that hashtag:
# map hashtag string-count tuples to Rows
rows = rdd.map(
lambda tag: Row(hashtag=tag[0], total=tag[1]))
The next statement creates a Spark DataFrame
containing the Row
objects. We’ll use this with Spark SQL to query the data to get the top-20 hashtags with their total counts:
# create a DataFrame from the Row objects
hashtags_df = spark.createDataFrame(rows)
To query a Spark DataFrame
, first create a table view, which enables Spark SQL to query the DataFrame
like a table in a relational database. Spark DataFrame
method createOrReplaceTempView
creates a temporary table view for the DataFrame
and names the view for use in the from
clause of a query:
# create a temporary table view for use with Spark SQL
hashtags_df.createOrReplaceTempView('hashtags')
Once you have a table view, you can query the data using Spark SQL.52 The following statement uses the SparkSession
instance’s sql
method to perform a Spark SQL query that select
s the hashtag
and total
columns from
the hashtags
table view, order
s the selected rows by total
in descending (desc
) order, then returns the first 20 rows of the result (limit
20
). Spark SQL returns a new Spark DataFrame
containing the results:
# use Spark SQL to get top 20 hashtags in descending order
top20_df = spark.sql(
"""select hashtag, total
from hashtags
order by total, hashtag desc
limit 20""")
Finally, we pass the Spark DataFrame
to our display_barplot
utility function. The hashtags and totals will be displayed on the x- and y-axes, respectively. We also display the time at which count_tags
was called:
display_barplot(top20_df, x='hashtag', y='total', time=time)
except Exception as e:
print(f'Exception: {e}')
SparkContext
The rest of the code in this notebook sets up Spark streaming to read text from the starttweetstream.py
script and specifies how to process the tweets. First, we create the SparkContext
for connecting to the Spark cluster:
[5]: sc = SparkContext()
StreamingContext
For Spark streaming, you must create a StreamingContext
(module pyspark.streaming
), providing as arguments the SparkContext
and how often in seconds to process batches of streaming data. In this app, we’ll process batches every 10 seconds—this is the batch interval:
[6]: ssc = StreamingContext(sc, 10)
Depending on how fast data is arriving, you may wish to shorten or lengthen your batch intervals. For a discussion of this and other performance-related issues, see the Performance Tuning section of the Spark Streaming Programming Guide:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
By default, Spark streaming does not maintain state information as you process the stream of RDDs. However, you can use Spark checkpointing to keep track of the streaming state. Checkpointing enables:
fault-tolerance for restarting a stream in cases of cluster node or Spark application failures, and
stateful transformations, such as summarizing the data received so far—as we’re doing in this example.
StreamingContext
method checkpoint
sets up the checkpointing folder:
[7]: ssc.checkpoint('hashtagsummarizer_checkpoint')
For a Spark streaming application in a cloud-based cluster, you’d specify a location within HDFS to store the checkpoint folder. We’re running this example in the local Jupyter Docker image, so we simply specified the name of a folder, which Spark will create in the current folder (in our case, the ch17
folder’s SparkHashtagSummarizer
). For more details on checkpointing, see
https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
StreamingContext
method socketTextStream
connects to a socket from which a stream of data will be received and returns a DStream
that receives the data. The method’s arguments are the hostname and port number to which the StreamingContext
should connect—these must match where the starttweetstream.py
script is waiting for the connection:
[8]: stream = ssc.socketTextStream('localhost', 9876)
We use functional-style programming calls on a DStream
to specify the processing steps to perform on the streaming data. The following call to DStream
’s flatMap
method tokenizes a line of space-separated hashtags and returns a new DStream
representing the individual tags:
[9]: tokenized = stream.flatMap(lambda line: line.split())
Next, similar to the Hadoop mapper earlier in this chapter, we use DStream
method map
to get a new DStream
in which each hashtag is mapped to a hashtag-count pair (in this case as a tuple) in which the count is initially 1
:
[10]: mapped = tokenized.map(lambda hashtag: (hashtag, 1))
DStream
method updateStateByKey
receives a two-argument lambda
that totals the counts for a given key and adds them to the prior total for that key:
[11]: hashtag_counts = tokenized.updateStateByKey(
lambda counts, prior_total: sum(counts) + (prior_total or 0))
Finally, we use DSteam
method foreachRDD
to specify that every processed RDD should be passed to function count_tags
, which then summarizes the top-20 hashtags so far and displays a barplot:
[12]: hashtag_counts.foreachRDD(count_tags)
Now, that we’ve specified the processing steps, we call the StreamingContext
’s start
method to connect to the socket and begin the streaming process.
[13]: ssc.start() # start the Spark streaming
The following shows a sample barplot produced while processing a stream of tweets about “football.” Because football is a different sport in the United States and the rest of the world the hashtags relate to both American football and what we call soccer—we grayed out three hashtags that were not appropriate for publication:
(Fill-In) Spark DataFrame
method returns a pandas DataFrame
.
Answer: toPandas
.
(True/False) You can use Spark SQL to query RDD
objects using familiar Structured Query Language syntax.
Answer: False. Spark SQL requires a table view of a Spark DataFrame
.
(Discussion) Assuming hashtags_df
is a Spark DataFrame
, what does the following code do?
hashtags_df.createOrReplaceTempView('hashtags')
Answer: This statement creates (or replaces) a temporary table view for the DataFrame
hashtags_df and names it 'hashtags'
for use in Spark SQL queries.
(True/False) By default, Spark streaming does not maintain state information as you process the stream of RDDs. However, you can use Spark checkpointing to keep track of the streaming state for fault-tolerance and stateful transformations, such as summarizing the data received so far.
Answer: True.
18.118.137.243