Processing Twitter data

The second main constraint of using Twitter is the constraint of noise. When most classification models are trained against dozens of different classes, we will be working against hundreds of thousands of distinct hashtags per day. We will be focusing on popular topics only, meaning the trending topics occurring within a defined batch window. However, because a 15 minute batch size on Twitter will not be sufficient enough to detect trends, we will apply a 24-hour moving window where all hashtags will be observed and counted, and where only the most popular ones will be kept.

Processing Twitter data
Figure 9: Twitter online layer, batch and window size

Using this approach, we reduce the noise of unpopular hashtags, making our classifier much more accurate and scalable, and significantly reducing the number of articles to fetch as we only focus on trending URLs mentioned alongside popular topics. This allows us to save lots of time and resources spent analyzing irrelevant data (with regards to a classification model).

Extracting URLs and hashtags

We extract both the clean hashtags (that are more than x characters long and that do not contain numbers; yet another measure of reducing noise) and references to valid URLs. Note the Scala Try method that catches any exception when testing a URL object. Only the tweets matching both of these two conditions will be kept:

def extractTags(tweet: String) = {
  StringUtils.stripAccents(tweet.toLowerCase())
    .split("\s")
    .filter { word =>
      word.startsWith("#") &&
        word.length > minHashTagLength &&
        word.matches("#[a-z]+")
    }
}
 
def extractUrls(tweet: String) = {
  tweet.split("\s")
    .filter(_.startsWith("http"))
    .map(_.trim)
    .filter(url => Try(new URL(url)).isSuccess)
}

def getLabeledUrls(twitterStream: DStream[Status]) = {
  twitterStream flatMap { tweet =>
    val tags = extractTags(tweet.getText)
    val urls = extractUrls(tweet.getText)
    urls map { url =>
      (url, tags)
    }
  }
}

val labeledUrls = getLabeledUrls(twitterStream)

Keeping popular hashtags

The basic idea of this step is to execute a simple word count over a 24h time window. We extract all hashtags, assign a value of 1, and count the number of occurrences using a reduce function. In a streaming context, the reduceByKey function can be applied over a window (that must be larger than the batch size) using reduceByKeyAndWindow method. Although this term frequency dictionary will always be available at each batch, the current top ten hashtags are printed out every 15 minutes, data will be counted over a larger period (24h):

def getTrends(twitterStream: DStream[Status]) = {

  val stream = twitterStream
    .flatMap { tweet =>
      extractTags(tweet.getText)
    }
    .map(_ -> 1)
    .reduceByKeyAndWindow(_ + _, Minutes(windowSize))

  stream.foreachRDD { rdd =>
    val top10 = rdd.sortBy(_._2, ascending = false).take(10)
    top10.foreach { case (hashTag, count) =>
      println(s"[$hashTag] - $count")
    }
  }

  stream
}

val twitterTrend = getTrends(twitterStream)

In a batch processing context, one could easily join an RDD of hashtags with the Twitter RDD in order to keep only the "hottest" tweets (tweets mentioning an article alongside a popular hashtag). In a streaming context, data streams cannot be joined as each stream contains several RDDs. Instead, we transform a DStream with another one using the transformWith function that takes an anonymous function as an argument and applies it on each of their RDDs. We transform our Twitter stream with our hashtag stream by applying a function that filters out the unpopular tweets. Note that we use Spark context to broadcast our current top n hashtags (limited to the top 100 here):

val joinFunc = (labeledUrls: RDD[(String, Array[String])], twitterTrend: RDD[(String, Int)]) => {
 
   val sc = twitterTrend.sparkContext
   val leaderBoard = twitterTrend
     .sortBy(_._2, ascending = false)
     .take(100)
     .map(_._1)

   val bLeaderBoard = sc.broadcast(leaderBoard)
 
   labeledUrls
     .flatMap { case (url, tags) =>
       tags map (tag => (url, tag))
     }
     .filter { case (url, tag) =>
       bLeaderBoard.value.contains(tag)
     }
     .groupByKey()
     .mapValues(_.toArray.distinct)
 
 }
 
 val labeledTrendUrls = labeledUrls
   .transformWith(twitterTrend, joinFunc)

Because the returned stream will only contain the "hottest" URLs, the amount of data should be drastically reduced. Although we cannot guarantee at this stage whether or not the URL points to a proper text content (could be a YouTube video or a simple image), at least we know we won't waste effort fetching content about useless topics.

Expanding shortened URLs

URLs available on Twitter are shortened. The only way to detect the true source programmatically is to "open the box" for all of them, wasting, sadly, lots of time and effort on potentially irrelevant content. It is also worth mentioning that many web scrapers would not handle shortened URLs efficiently (including Goose scraper). We expand URLs by opening an HTTP connection, disabling redirects, and looking at the Location header. We also provide the method with a list of "untrusted" sources, sources that are, for the context of a classification model, not providing any useful content (such as videos from https://www.youtube.com):

def expandUrl(url: String) : String = {

  var connection: HttpURLConnection = null
  try {

    connection = new URL(url)
                    .openConnection
                    .asInstanceOf[HttpURLConnection]

    connection.setInstanceFollowRedirects(false)
    connection.setUseCaches(false)
    connection.setRequestMethod("GET")
    connection.connect()

    val redirectedUrl = connection.getHeaderField("Location")
    
    if(StringUtils.isNotEmpty(redirectedUrl)){
       redirectedUrl
     } else {
       url
     }

   } catch {
     case e: Throwable => url
   } finally {
     if(connection != null)
       connection.disconnect()
   }
 }

 def expandUrls(tStream: DStream[(String, Array[String])]) = {
   tStream
     .map { case (url, tags) =>
       (HtmlHandler.expandUrl(url), tags)
     }
     .filter { case (url, tags) =>
       !untrustedSources.value.contains(url)
     }
}

val expandedUrls = expandUrls(labeledTrendUrls)

Tip

Similar to what has been done in the previous chapter, we thoroughly catch any possible exceptions arising from an HTTP connection. Any uncaught exception (could be a simple 404 error) would make this task re-evaluate on different Spark executors before raising a fatal exception, exiting our Spark application.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.186.79