When we look back at the year 2016, we will surely remember it as a time of many significant geo-political events ranging from Brexit, Great Britain's vote to leave the European Union, to the untimely passing of many beloved celebrities, including the sudden death of the singer David Bowie (covered in Chapter 6, Scraping Link-Based External Data and Chapter 7, Building Communities). However, perhaps the most notable occurrence of the year was the tense US presidential election and its eventual outcome, the election of President Donald Trump. A campaign that will long be remembered, not least for its unprecedented use of social media, and the stirring up of passion among its users, most of whom made their feelings known through the use of hashtags: either positive ones, such as #MakeAmericaGreatAgain or #StrongerTogether, or conversely negative ones, such as #DumpTrump or #LockHerUp. Since this chapter is about sentiment analysis, the election presents the ideal use case. However, instead of trying to predict the outcome itself, we will aim to detect abnormal tweets during the US election using a real-time Twitter feed. We will cover the following topics:
On November 8, 2016, American citizens went in millions to polling stations to cast their votes for the next President of the United States. Counting began almost immediately and, although not officially confirmed until sometime later, the forecasted result was well known by the next morning. Let's start our investigation a couple of days before the major event itself, on November 6, 2016, so that we can preserve some context in the run-up. Although we do not exactly know what we will find in advance, we know that Twitter will play an oversized role in the political commentary given its influence in the build-up, and it makes sense to start collecting data as soon as possible. In fact, data scientists may sometimes experience this as a gut feeling - a strange and often exciting notion that compels us to commence working on something without a clear plan or absolute justification, just a sense that it will pay off. And actually, this approach can be vital since, given the normal time required to formulate and realize such a plan and the transient nature of events, a major news event may occur (refer to Chapter 10, Story De-duplication and Mutation), a new product may have been released, or the stock market may be trending differently (see Chapter 12, TrendCalculus); by this time, the original dataset may no longer be available
The first action is to start acquiring Twitter data. As we plan to download more than 48 hours worth of tweets, the code should be robust enough to not fail somewhere in the middle of the process; there is nothing more frustrating than a fatal NullPointerException
occurring after many hours of intense processing. We know we will be working on sentiment analysis at some point down the line, but for now we do not wish to over-complicate our code with large dependencies as this can decrease stability and lead to more unchecked exceptions. Instead, we will start by collecting and storing the data and subsequent processing will be done offline on the collected data, rather than applying this logic to the live stream.
We create a new Streaming context reading from Twitter 1% firehose using the utility methods created in Chapter 9
, News Dictionary and Real-Time Tagging System. We also use the excellent GSON library to serialize Java class Status
(Java class embedding Twitter4J records) to JSON objects.
<dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.3.1</version> </dependency>
We read Twitter data every 5 minutes and have a choice to optionally supply Twitter filters as command line arguments. Filters can be keywords such as Trump , Clinton or #MAGA , #StrongerTogether . However, we must bear in mind that by doing this we may not capture all relevant tweets as we can never be fully up to date with the latest hashtag trends (such as #DumpTrump , #DrainTheSwamp , #LockHerUp , or #LoveTrumpsHate) and many tweets will be overlooked with an inadequate filter, so we will use an empty filter list to ensure that we catch everything.
val sparkConf = new SparkConf().setAppName("Twitter Extractor") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Minutes(5)) val filter = args val twitterStream = createTwitterStream(ssc, filter) .mapPartitions { it => val gson = new GsonBuilder().create() it.map { s: Status => Try(gson.toJson(s)).toOption } }
We serialize our Status
class using the GSON library and persist our JSON objects in HDFS. Note that the serialization occurs within a Try
clause to ensure that unwanted exceptions are not thrown. Instead, we return JSON as an optional String
:
twitterStream .filter(_.isSuccess) .map(_.get) .saveAsTextFiles("/path/to/twitter")
Finally, we run our Spark Streaming context and keep it alive until a new president has been elected, no matter what happens!
ssc.start() ssc.awaitTermination()
Only 1% of tweets are retrieved through the Spark Streaming API, meaning that 99% of records will be discarded. Although able to download around 10 million tweets, we can potentially download more data, but this time only for a selected hashtag and within a small period of time. For example, we can download all tweets related to the #LockHerUp or #BuildTheWall hashtags.
For that purpose, we consume Twitter historical data through the twitter4j
Java API. This library comes as a transitive dependency of spark-streaming-twitter_2.11
. To use it outside of a Spark project, the following maven dependency should be used:
<dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-core</artifactId> <version>4.0.4</version> </dependency>
We create a Twitter4J client as follows:
ConfigurationBuilder builder = new ConfigurationBuilder(); builder.setOAuthConsumerKey(apiKey); builder.setOAuthConsumerSecret(apiSecret); Configuration configuration = builder.build(); AccessToken token = new AccessToken( accessToken, accessTokenSecret ); Twitter twitter = new TwitterFactory(configuration) .getInstance(token);
Then, we consume the /search/tweets
service through the Query
object:
Query q = new Query(filter); q.setSince(fromDate); q.setUntil(toDate); q.setCount(400); QueryResult r = twitter.search(q); List<Status> tweets = r.getTweets();
Finally, we get a list of Status
objects that can easily be serialized using the GSON library introduced earlier.
Twitter is a fantastic resource for data science, but it is far from a non-profit organization, and as such, they know how to value and price data. Without any special agreement, the search API is limited to a few days retrospective, a maximum of 180 queries per 15 minute window and 450 records per query. This limit can be confirmed on both the Twitter DEV website (https://dev.twitter.com/rest/public/rate-limits) and from the API itself using the RateLimitStatus
class:
Map<String, RateLimitStatus> rls = twitter.getRateLimitStatus("search"); System.out.println(rls.get("/search/tweets")); /* RateLimitStatusJSONImpl{remaining=179, limit=180, resetTimeInSeconds=1482102697, secondsUntilReset=873} */
Unsurprisingly, any queries on popular terms, such as #MAGA on November 9, 2016, hit this threshold. To avoid a rate limit exception, we have to page and throttle our download requests by keeping track of the maximum number of tweet IDs processed and monitor our status limit after each search request.
RateLimitStatus strl = rls.get("/search/tweets"); int totalTweets = 0; long maxID = -1; for (int i = 0; i < 400; i++) { // throttling if (strl.getRemaining() == 0) Thread.sleep(strl.getSecondsUntilReset() * 1000L); Query q = new Query(filter); q.setSince(fromDate); q.setUntil(toDate); q.setCount(100); // paging if (maxID != -1) q.setMaxId(maxID - 1); QueryResult r = twitter.search(q); for (Status s: r.getTweets()) { totalTweets++; if (maxID == -1 || s.getId() < maxID) maxID = s.getId(); writer.println(gson.toJson(s)); } strl = r.getRateLimitStatus(); }
With around half a billion tweets a day, it will be optimistic, if not Naive, to gather all US-related data. Instead, the simple ingest process detailed earlier should be used to intercept tweets matching specific queries only. Packaged as main class in an assembly jar, it can be executed as follows:
java -Dtwitter.properties=twitter.properties / -jar trump-1.0.jar #maga 2016-11-08 2016-11-09 / /path/to/twitter-maga.json
Here, the twitter.properties
file contains your Twitter API keys:
twitter.token = XXXXXXXXXXXXXX twitter.token.secret = XXXXXXXXXXXXXX twitter.api.key = XXXXXXXXXXXXXX twitter.api.secret = XXXXXXXXXXXXXX
18.226.185.96