We've already introduced web scrapers in a previous chapter, using Goose library recompiled for Scala 2.11. We will create a method that takes a DStream
as input instead of an RDD, and only keep the valid text content with at least 500 words. We will finally return a stream of text alongside the associated hashtags (the popular ones):
def fetchHtmlContent(tStream: DStream[(String, Array[String])]) = { tStream .reduceByKey(_++_.distinct) .mapPartitions { it => val htmlFetcher = new HtmlHandler() val goose = htmlFetcher.getGooseScraper val sdf = new SimpleDateFormat("yyyyMMdd") it.map { case (url, tags) => val content = htmlFetcher.fetchUrl(goose, url, sdf) (content, tags) } .filter { case (contentOpt, tags) => contentOpt.isDefined && contentOpt.get.body.isDefined && contentOpt.get.body.get.split("\s+").length >= 500 } .map { case (contentOpt, tags) => (contentOpt.get.body.get, tags) } } val twitterContent = fetchHtmlContent(expandedUrls)
We apply the same approach for GDELT data where all the content (text, title, description, and so on) will also be returned. Note the reduceByKey
method, which acts as a distinct function for our data stream:
def fetchHtmlContent(urlStream: DStream[String]) = { urlStream .map(_ -> 1) .reduceByKey() .keys .mapPartitions { urls => val sdf = new SimpleDateFormat("yyyyMMdd") val htmlHandler = new HtmlHandler() val goose = htmlHandler.getGooseScraper urls.map { url => htmlHandler.fetchUrl(goose, url, sdf) } } .filter { content => content.isDefined && content.get.body.isDefined && content.get.body.get.split("\s+").length > 500 } .map(_.get) } val gdeltContent = fetchHtmlContent(gdeltUrlStream)
3.15.237.164