Chapter 11. Anomaly Detection on Sentiment Analysis

When we look back at the year 2016, we will surely remember it as a time of many significant geo-political events ranging from Brexit, Great Britain's vote to leave the European Union, to the untimely passing of many beloved celebrities, including the sudden death of the singer David Bowie (covered in Chapter 6, Scraping Link-Based External Data and Chapter 7, Building Communities). However, perhaps the most notable occurrence of the year was the tense US presidential election and its eventual outcome, the election of President Donald Trump. A campaign that will long be remembered, not least for its unprecedented use of social media, and the stirring up of passion among its users, most of whom made their feelings known through the use of hashtags: either positive ones, such as #MakeAmericaGreatAgain or #StrongerTogether, or conversely negative ones, such as #DumpTrump or #LockHerUp. Since this chapter is about sentiment analysis, the election presents the ideal use case. However, instead of trying to predict the outcome itself, we will aim to detect abnormal tweets during the US election using a real-time Twitter feed. We will cover the following topics:

  • Acquiring Twitter data in real-time and batch
  • Extracting sentiment using Stanford NLP
  • Storing sentiment time series in Timely
  • Deriving features from only 140 characters using Word2Vec
  • Introducing the concepts of graph ergodicity and shortest paths
  • Training a KMeans model to detect potential anomalies
  • Visualizing models with Embedding Projector from TensorFlow

Following the US elections on Twitter

On November 8, 2016, American citizens went in millions to polling stations to cast their votes for the next President of the United States. Counting began almost immediately and, although not officially confirmed until sometime later, the forecasted result was well known by the next morning. Let's start our investigation a couple of days before the major event itself, on November 6, 2016, so that we can preserve some context in the run-up. Although we do not exactly know what we will find in advance, we know that Twitter will play an oversized role in the political commentary given its influence in the build-up, and it makes sense to start collecting data as soon as possible. In fact, data scientists may sometimes experience this as a gut feeling - a strange and often exciting notion that compels us to commence working on something without a clear plan or absolute justification, just a sense that it will pay off. And actually, this approach can be vital since, given the normal time required to formulate and realize such a plan and the transient nature of events, a major news event may occur (refer to Chapter 10, Story De-duplication and Mutation), a new product may have been released, or the stock market may be trending differently (see Chapter 12, TrendCalculus); by this time, the original dataset may no longer be available

Acquiring data in stream

The first action is to start acquiring Twitter data. As we plan to download more than 48 hours worth of tweets, the code should be robust enough to not fail somewhere in the middle of the process; there is nothing more frustrating than a fatal NullPointerException occurring after many hours of intense processing. We know we will be working on sentiment analysis at some point down the line, but for now we do not wish to over-complicate our code with large dependencies as this can decrease stability and lead to more unchecked exceptions. Instead, we will start by collecting and storing the data and subsequent processing will be done offline on the collected data, rather than applying this logic to the live stream.

We create a new Streaming context reading from Twitter 1% firehose using the utility methods created in Chapter 9 , News Dictionary and Real-Time Tagging System. We also use the excellent GSON library to serialize Java class Status (Java class embedding Twitter4J records) to JSON objects.

<dependency> 
  <groupId>com.google.code.gson</groupId>
  <artifactId>gson</artifactId>
  <version>2.3.1</version>
</dependency>

We read Twitter data every 5 minutes and have a choice to optionally supply Twitter filters as command line arguments. Filters can be keywords such as Trump , Clinton or #MAGA , #StrongerTogether . However, we must bear in mind that by doing this we may not capture all relevant tweets as we can never be fully up to date with the latest hashtag trends (such as #DumpTrump , #DrainTheSwamp , #LockHerUp , or #LoveTrumpsHate) and many tweets will be overlooked with an inadequate filter, so we will use an empty filter list to ensure that we catch everything.

val sparkConf  = new SparkConf().setAppName("Twitter Extractor")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Minutes(5))
 
val filter = args

val twitterStream = createTwitterStream(ssc, filter)
  .mapPartitions { it =>
     val gson = new GsonBuilder().create()
     it.map { s: Status =>
       Try(gson.toJson(s)).toOption
     }
  }

We serialize our Status class using the GSON library and persist our JSON objects in HDFS. Note that the serialization occurs within a Try clause to ensure that unwanted exceptions are not thrown. Instead, we return JSON as an optional String:

twitterStream
  .filter(_.isSuccess)
  .map(_.get)
  .saveAsTextFiles("/path/to/twitter")

Finally, we run our Spark Streaming context and keep it alive until a new president has been elected, no matter what happens!

ssc.start()
ssc.awaitTermination()

Acquiring data in batch

Only 1% of tweets are retrieved through the Spark Streaming API, meaning that 99% of records will be discarded. Although able to download around 10 million tweets, we can potentially download more data, but this time only for a selected hashtag and within a small period of time. For example, we can download all tweets related to the #LockHerUp or #BuildTheWall hashtags.

The search API

For that purpose, we consume Twitter historical data through the twitter4j Java API. This library comes as a transitive dependency of spark-streaming-twitter_2.11. To use it outside of a Spark project, the following maven dependency should be used:

<dependency>
  <groupId>org.twitter4j</groupId>
  <artifactId>twitter4j-core</artifactId>
  <version>4.0.4</version>
</dependency>

We create a Twitter4J client as follows:

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setOAuthConsumerKey(apiKey);
builder.setOAuthConsumerSecret(apiSecret);
Configuration configuration = builder.build();

AccessToken token = new AccessToken(
  accessToken,
  accessTokenSecret
);

Twitter twitter =
  new TwitterFactory(configuration)
      .getInstance(token);

Then, we consume the /search/tweets service through the Query object:

Query q = new Query(filter);
q.setSince(fromDate);
q.setUntil(toDate);
q.setCount(400);

QueryResult r = twitter.search(q);
List<Status> tweets = r.getTweets();

Finally, we get a list of Status objects that can easily be serialized using the GSON library introduced earlier.

Rate limit

Twitter is a fantastic resource for data science, but it is far from a non-profit organization, and as such, they know how to value and price data. Without any special agreement, the search API is limited to a few days retrospective, a maximum of 180 queries per 15 minute window and 450 records per query. This limit can be confirmed on both the Twitter DEV website (https://dev.twitter.com/rest/public/rate-limits) and from the API itself using the RateLimitStatus class:

Map<String, RateLimitStatus> rls = twitter.getRateLimitStatus("search");
System.out.println(rls.get("/search/tweets"));

/*
RateLimitStatusJSONImpl{remaining=179, limit=180, resetTimeInSeconds=1482102697, secondsUntilReset=873}
*/

Unsurprisingly, any queries on popular terms, such as #MAGA on November 9, 2016, hit this threshold. To avoid a rate limit exception, we have to page and throttle our download requests by keeping track of the maximum number of tweet IDs processed and monitor our status limit after each search request.

RateLimitStatus strl = rls.get("/search/tweets");
int totalTweets = 0;
long maxID = -1;
for (int i = 0; i < 400; i++) {
 
  // throttling
  if (strl.getRemaining() == 0)
    Thread.sleep(strl.getSecondsUntilReset() * 1000L);
 
  Query q = new Query(filter);
  q.setSince(fromDate);
  q.setUntil(toDate);
  q.setCount(100);
 
  // paging
  if (maxID != -1) q.setMaxId(maxID - 1);
 
  QueryResult r = twitter.search(q);
  for (Status s: r.getTweets()) {
    totalTweets++;
    if (maxID == -1 || s.getId() < maxID)
     maxID = s.getId();
     writer.println(gson.toJson(s));
  }
  strl = r.getRateLimitStatus();
}

With around half a billion tweets a day, it will be optimistic, if not Naive, to gather all US-related data. Instead, the simple ingest process detailed earlier should be used to intercept tweets matching specific queries only. Packaged as main class in an assembly jar, it can be executed as follows:

java -Dtwitter.properties=twitter.properties /
  -jar trump-1.0.jar #maga 2016-11-08 2016-11-09 /
  /path/to/twitter-maga.json

Here, the twitter.properties file contains your Twitter API keys:

twitter.token = XXXXXXXXXXXXXX
twitter.token.secret = XXXXXXXXXXXXXX
twitter.api.key = XXXXXXXXXXXXXX
twitter.api.secret = XXXXXXXXXXXXXX
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.185.96