17.7 Spark Streaming: Counting Twitter Hashtags Using the pyspark-notebook Docker Stack

In this section, you’ll create and run a Spark streaming application in which you’ll receive a stream of tweets on the topic(s) you specify and summarize the top-20 hashtags in a bar chart that updates every 10 seconds. For this purpose of this example, you’ll use the Jupyter Docker container from the first Spark example.

There are two parts to this example. First, using the techniques from the “Data Mining Twitter” chapter, you’ll create a script that streams tweets from Twitter. Then, we’ll use Spark streaming in a Jupyter Notebook to read the tweets and summarize the hashtags.

The two parts will communicate with one another via networking sockets—a low-level view of client/server networking in which a client app communicates with a server app over a network using techniques similar to file I/O. A program can read from a socket or write to a socket similarly to reading from a file or writing to a file. The socket represents one endpoint of a connection. In this case, the client will be a Spark application, and the server will be a script that receives streaming tweets and sends them to the Spark app.

Launching the Docker Container and Installing Tweepy

For this example, you’ll install the Tweepy library into the Jupyter Docker container. Follow Section 17.6.2’s instructions for launching the container and installing Python libraries into it. Use the following command to install Tweepy:

pip install tweepy

17.7.1 Streaming Tweets to a Socket

The script starttweetstream.py contains a modified version of the TweetListener class from the “Data Mining Twitter” chapter. It streams the specified number of tweets and sends them to a socket on the local computer. When the tweet limit is reached, the script closes the socket. You’ve already used Twitter streaming, so we’ll focus only on what’s new. Ensure that the file keys.py (in the ch17 folder’s SparkHashtagSummarizer subfolder) contains your Twitter credentials.

Executing the Script in the Docker Container

In this example, you’ll use JupyterLab’s Terminal window to execute starttweetstream.py in one tab, then use a notebook to perform the Spark task in another tab. With the Jupyter pyspark-notebook Docker container running, open

http://localhost:8888/lab

in your web browser. In JupyterLab, select File > New > Terminal to open a new tab containing a Terminal. This is a Linux-based command line. Typing the ls command and pressing Enter lists the current folder’s contents. By default, you’ll see the container’s work folder.

To execute starttweetstream.py, you must first navigate to the SparkHashtagSummarizer folder with the command48:

cd work/SparkHashtagSummarizer

You can now execute the script with the command of the form

ipython starttweetstream.py number_of_tweets search_terms

where number_of_tweets specifies the total number of tweets to process and search_terms one or more space-separated strings to use for filtering tweets. For example, the following command would stream 1000 tweets about football:

ipython starttweetstream.py 1000 football

At this point, the script will display "Waiting for connection" and will wait until Spark connects to begin streaming the tweets.

starttweetstream.py import Statements

For discussion purposes, we’ve divided starttweetstream.py into pieces. First, we import the modules used in the script. The Python Standard Library’s socket module provides the capabilities that enable Python apps to communicate via sockets.

1 # starttweetstream.py
2 """Script to get tweets on topic(s) specified as script argument(s)
3    and send tweet text to a socket for processing by Spark."""
4 import keys
5 import socket
6 import sys
7 import tweepy
8

Class TweetListener

Once again, you’ve seen most of the code in class TweetListener, so we focus only on what’s new here:

  • Method __init__ (lines 12–17) now receives a connection parameter representing the socket and stores it in the self.connection attribute. We use this socket to send the hashtags to the Spark application.

  • In method on_status (lines 24–44), lines 27–32 extract the hashtags from the Tweepy Status object, convert them to lowercase and create a space-separated string of the hashtags to send to Spark. The key statement is line 39:

    self.connection.send(hashtags_string.encode('utf-8'))

    which uses the connection object’s send method to send the tweet text to whatever application is reading from that socket. Method send expects as its argument a sequence of bytes. The string method call encode('utf-8') converts the string to bytes. Spark will automatically read the bytes and reconstruct the strings.

9 class TweetListener(tweepy.StreamListener):
10    """Handles incoming Tweet stream."""
11
12    def __init__(self, api, connection, limit=10000):
13        """Create instance variables for tracking number of tweets."""
14        self.connection = connection
15        self.tweet_count = 0
16        self.TWEET_LIMIT = limit # 10,000 by default
17        super().__init__(api) # call superclass's init
18
19    def on_connect(self):
20        """Called when your connection attempt is successful, enabling
21        you to perform appropriate application tasks at that point."""
22        print('Successfully connected to Twitter
')
23
24    def on_status(self, status):
25        """Called when Twitter pushes a new tweet to you."""
26        # get the hashtags
27        hashtags = []
28
29        for hashtag_dict in status.entities['hashtags']:
30            hashtags.append(hashtag_dict['text'].lower())
31
32        hashtags_string = ' '.join(hashtags) + '
'
33        print(f'Screen name: {status.user.screen_name}:')
34        print(f' Hashtags: {hashtags_string}')
35        self.tweet_count += 1 # track number of tweets processed
36
37        try:
38            # send requires bytes, so encode the string in utf-8 format
39            self.connection.send(hashtags_string.encode('utf-8'))
40        except Exception as e:
41            print(f'Error: {e}')
42
43        # if TWEET_LIMIT is reached, return False to terminate streaming
44        return self.tweet_count != self.TWEET_LIMIT
45
46    def on_error(self, status):
47        print(status)
48        return True
49

Main Application

Lines 50–80 execute when you run the script. You’ve connected to Twitter to stream tweets previously, so here we discuss only what’s new in this example.

Line 51 gets the number of tweets to process by converting the command-line argument sys.argv[1] to an integer. Recall that element 0 represents the script’s name.

50 if __name__ == '__main__':
51     tweet_limit = int(sys.argv[1]) # get maximum number of tweets

Line 52 calls the socket module’s socket function, which returns a socket object that we’ll use to wait for a connection from the Spark application.

52 client_socket = socket.socket() # create a socket
53

Line 55 calls the socket object’s bind method with a tuple containing the hostname or IP address of the computer and the port number on that computer. Together these represent where this script will wait for an initial connection from another app:

54 # app will use localhost (this computer) port 9876
55 client_socket.bind(('localhost', 9876))
56

Line 58 calls the socket’s listen method, which causes the script to wait until a connection is received. This is the statement that prevents the Twitter stream from starting until the Spark application connects.

57 print('Waiting for connection')
58 client_socket.listen() # wait for client to connect
59

Once the Spark application connects, line 61 calls socket method accept, which accepts the connection. This method returns a tuple containing a new socket object that the script will use to communicate with the Spark application and the IP address of the Spark application’s computer.

60 # when connection received, get connection/client address
61 connection, address = client_socket.accept()
62 print(f'Connection received from {address}')
63

Next, we authenticate with Twitter and start the stream. Lines 73–74 set up the stream, passing the socket object connection to the TweetListener so that it can use the socket to send hashtags to the Spark application.

64 # configure Twitter access
65 auth = tweepy.OAuthHandler(keys.consumer_key, keys.consumer_secret)
66 auth.set_access_token(keys.access_token, keys.access_token_secret)
67
68 # configure Tweepy to wait if Twitter rate limits are reached
69 api = tweepy.API(auth, wait_on_rate_limit=True,
70                  wait_on_rate_limit_notify=True)
71
72 # create the Stream
73 twitter_stream = tweepy.Stream(api.auth,
74      TweetListener(api, connection, tweet_limit))
75
76 # sys.argv[2] is the first search term
77 twitter_stream.filter(track=sys.argv[2:])
78

Finally, lines 79–80 call the close method on the socket objects to release their resources.

79 connection.close()
80 client_socket.close()

17.7.2 Summarizing Tweet Hashtags; Introducing Spark SQL

In this section, you’ll use Spark streaming to read the hashtags sent via a socket by the script starttweetstream.py and summarize the results. You can either create a new notebook and enter the code you see here or load the hashtagsummarizer.ipynb notebook we provide in the ch17 examples folder’s SparkHashtagSummarizer subfolder.

Importing the Libraries

First, let’s import the libraries used in this notebook. We’ll explain the pyspark classes as we use them. From IPython, we imported the display module, which contains classes and utility functions that you can use in Jupyter. In particular, we’ll use the clear_output function to remove an existing chart before displaying a new one:

[1]: from pyspark import SparkContext
     from pyspark.streaming import StreamingContext
     from pyspark.sql import Row, SparkSession
     from IPython import display
     import matplotlib.pyplot as plt
     import seaborn as sns
     %matplotlib inline

This Spark application summarizes hashtags in 10-second batches. After processing each batch, it displays a Seaborn barplot. The IPython magic

%matplotlib inline

indicates that Matplotlib-based graphics should be displayed in the notebook rather than in their own windows. Recall that Seaborn uses Matplotlib.

We’ve used several IPython magics throughout the book. There are many magics specifically for use in Jupyter Notebooks. For the complete list of magics see:

https://ipython.readthedocs.io/en/stable/interactive/magics.html 

Utility Function to Get the SparkSession

As you’ll soon see, you can use Spark SQL to query data in resilient distributed datasets (RDDs). Spark SQL uses a Spark DataFrame to get a table view of the underlying RDDs. A SparkSession (module pyspark.sql) is used to create a DataFrame from an RDD.

There can be only one SparkSession object per Spark application. The following function, which we borrowed from the Spark Streaming Programming Guide,49 defines the correct way to get a SparkSession instance if it already exists or to create one if it does not yet exist:50

[2]: def getSparkSessionInstance(sparkConf):
     """Spark Streaming Programming Guide's recommended method
       for getting an existing SparkSession or creating a new one."""
     if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession 
          .builder 
          .config(conf=sparkConf) 
          .getOrCreate()
     return globals()["sparkSessionSingletonInstance"]

Utility Function to Display a Barchart Based on a Spark DataFrame

We call function display_barplot after Spark processes each batch of hashtags. Each call clears the previous Seaborn barplot, then displays a new one based on the Spark DataFrame it receives. First, we call the Spark DataFrame’s toPandas method to convert it to a pandas DataFrame for use with Seaborn. Next, we call the clear_output function from the IPython.display module. The keyword argument wait=True indicates that the function should remove the prior graph (if there is one), but only once the new graph is ready to display. The rest of the code in the function uses standard Seaborn techniques we’ve shown previously. The function call sns.color_palette('cool', 20) selects twenty colors from the Matplotlib 'cool' color palette:

[3]: def display_barplot(spark_df, x, y, time, scale=2.0, size=(16, 9)):
          """Displays a Spark DataFrame's contents as a bar plot."""
          df = spark_df.toPandas()

          # remove prior graph when new one is ready to display
          display.clear_output(wait=True)
          print(f'TIME: {time}')

          # create and configure a Figure containing a Seaborn barplot
          plt.figure(figsize=size)
          sns.set(font_scale=scale)
          barplot = sns.barplot(data=df, x=x, y=y
                                palette=sns.color_palette('cool', 20))

          # rotate the x-axis labels 90 degrees for readability
          for item in barplot.get_xticklabels():
              item.set_rotation(90)

          plt.tight_layout()
          plt.show()

Utility Function to Summarize the Top-20 Hashtags So Far

In Spark streaming, a DStream is a sequence of RDDs each representing a mini-batch of data to process. As you’ll soon see, you can specify a function that is called to perform a task for every RDD in the stream. In this app, the function count_tags will summarize the hashtag counts in a given RDD, add them to the current totals (maintained by the SparkSession), then display an updated top-20 barplot so that we can see how the top-20 hashtags are changing over time.51 For discussion purposes, we’ve broken this function into smaller pieces. First, we get the SparkSession by calling the utility function getSparkSessionInstance with the SparkContext’s configuration information. Every RDD has access to the SparkContext via the context attribute:

[4]: def count_tags(time, rdd):
         """Count hashtags and display top-20 in descending order."""
         try:
            # get SparkSession
            spark = getSparkSessionInstance(rdd.context.getConf())

Next, we call the RDD’s map method to map the data in the RDD to Row objects (from the pyspark.sql package). The RDDs in this example contain tuples of hashtags and counts. The Row constructor uses the names of its keyword arguments to specify the column names for each value in that row. In this case, tag[0] is the hashtag in the tuple, and tag[1] is the total count for that hashtag:

# map hashtag string-count tuples to Rows
rows = rdd.map(
   lambda tag: Row(hashtag=tag[0], total=tag[1]))

The next statement creates a Spark DataFrame containing the Row objects. We’ll use this with Spark SQL to query the data to get the top-20 hashtags with their total counts:

# create a DataFrame from the Row objects
hashtags_df = spark.createDataFrame(rows)

To query a Spark DataFrame, first create a table view, which enables Spark SQL to query the DataFrame like a table in a relational database. Spark DataFrame method createOrReplaceTempView creates a temporary table view for the DataFrame and names the view for use in the from clause of a query:

# create a temporary table view for use with Spark SQL
hashtags_df.createOrReplaceTempView('hashtags')

Once you have a table view, you can query the data using Spark SQL.52 The following statement uses the SparkSession instance’s sql method to perform a Spark SQL query that selects the hashtag and total columns from the hashtags table view, orders the selected rows by total in descending (desc) order, then returns the first 20 rows of the result (limit 20). Spark SQL returns a new Spark DataFrame containing the results:

# use Spark SQL to get top 20 hashtags in descending order
top20_df = spark.sql(
    """select hashtag, total
           from hashtags
           order by total, hashtag desc
           limit 20""")

Finally, we pass the Spark DataFrame to our display_barplot utility function. The hashtags and totals will be displayed on the x- and y-axes, respectively. We also display the time at which count_tags was called:

    display_barplot(top20_df, x='hashtag', y='total', time=time)
except Exception as e:
    print(f'Exception: {e}')

Getting the SparkContext

The rest of the code in this notebook sets up Spark streaming to read text from the starttweetstream.py script and specifies how to process the tweets. First, we create the SparkContext for connecting to the Spark cluster:

[5]: sc = SparkContext()

Getting the StreamingContext

For Spark streaming, you must create a StreamingContext (module pyspark.streaming), providing as arguments the SparkContext and how often in seconds to process batches of streaming data. In this app, we’ll process batches every 10 seconds—this is the batch interval:

[6]: ssc = StreamingContext(sc, 10)

Depending on how fast data is arriving, you may wish to shorten or lengthen your batch intervals. For a discussion of this and other performance-related issues, see the Performance Tuning section of the Spark Streaming Programming Guide:

https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

Setting Up a Checkpoint for Maintaining State

By default, Spark streaming does not maintain state information as you process the stream of RDDs. However, you can use Spark checkpointing to keep track of the streaming state. Checkpointing enables:

  • fault-tolerance for restarting a stream in cases of cluster node or Spark application failures, and

  • stateful transformations, such as summarizing the data received so far—as we’re doing in this example.

StreamingContext method checkpoint sets up the checkpointing folder:

[7]: ssc.checkpoint('hashtagsummarizer_checkpoint')

For a Spark streaming application in a cloud-based cluster, you’d specify a location within HDFS to store the checkpoint folder. We’re running this example in the local Jupyter Docker image, so we simply specified the name of a folder, which Spark will create in the current folder (in our case, the ch17 folder’s SparkHashtagSummarizer). For more details on checkpointing, see

https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

Connecting to the Stream via a Socket

StreamingContext method socketTextStream connects to a socket from which a stream of data will be received and returns a DStream that receives the data. The method’s arguments are the hostname and port number to which the StreamingContext should connect—these must match where the starttweetstream.py script is waiting for the connection:

[8]: stream = ssc.socketTextStream('localhost', 9876)

Tokenizing the Lines of Hashtags

We use functional-style programming calls on a DStream to specify the processing steps to perform on the streaming data. The following call to DStream’s flatMap method tokenizes a line of space-separated hashtags and returns a new DStream representing the individual tags:

[9]: tokenized = stream.flatMap(lambda line: line.split())

Mapping the Hashtags to Tuples of Hashtag-Count Pairs

Next, similar to the Hadoop mapper earlier in this chapter, we use DStream method map to get a new DStream in which each hashtag is mapped to a hashtag-count pair (in this case as a tuple) in which the count is initially 1:

[10]: mapped = tokenized.map(lambda hashtag: (hashtag, 1))

Totaling the Hashtag Counts So Far

DStream method updateStateByKey receives a two-argument lambda that totals the counts for a given key and adds them to the prior total for that key:

[11]: hashtag_counts = tokenized.updateStateByKey(
          lambda counts, prior_total: sum(counts) + (prior_total or 0))

Specifying the Method to Call for Every RDD

Finally, we use DSteam method foreachRDD to specify that every processed RDD should be passed to function count_tags, which then summarizes the top-20 hashtags so far and displays a barplot:

[12]: hashtag_counts.foreachRDD(count_tags)

Starting the Spark Stream

Now, that we’ve specified the processing steps, we call the StreamingContext’s start method to connect to the socket and begin the streaming process.

[13]: ssc.start() # start the Spark streaming

The following shows a sample barplot produced while processing a stream of tweets about “football.” Because football is a different sport in the United States and the rest of the world the hashtags relate to both American football and what we call soccer—we grayed out three hashtags that were not appropriate for publication:

An example of a bar plot for a stream of tweets about football.

Self Check for Section 17.7

  1. (Fill-In) Spark DataFrame method       returns a pandas DataFrame.
    Answer: toPandas.

  2. (True/False) You can use Spark SQL to query RDD objects using familiar Structured Query Language syntax.
    Answer: False. Spark SQL requires a table view of a Spark DataFrame.

  3. (Discussion) Assuming hashtags_df is a Spark DataFrame, what does the following code do?

    hashtags_df.createOrReplaceTempView('hashtags')


    Answer: This statement creates (or replaces) a temporary table view for the DataFrame hashtags_df and names it 'hashtags' for use in Spark SQL queries.

  4. (True/False) By default, Spark streaming does not maintain state information as you process the stream of RDDs. However, you can use Spark checkpointing to keep track of the streaming state for fault-tolerance and stateful transformations, such as summarizing the data received so far.
    Answer: True.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.137.243