Consuming data streams

Similar to a batch processing job, we create a new Spark application using a SparkConf object and a context. In a streaming application, the context is created using a batch size parameter that will be used for any incoming stream (both GDELT and Twitter layers, part of the same context, will both be tied to the same batch size). GDELT data being published every 15 minutes, our batch size will be naturally 15 minutes as we want to predict categories in a pseudo real-time basis:

val sparkConf = new SparkConf().setAppName("GZET")
val ssc = new StreamingContext(sparkConf, Minutes(15))
val sc = ssc.sparkContext

Creating a GDELT data stream

There are many ways of publishing external data into a Spark streaming application. One could open a simple socket and start publishing data over the netcat utility, or could be streaming data through a Flume agent monitoring an external directory. Production systems usually use Kafka as a default broker for both its high throughput and its overall reliability (data is replicated over multiple partitions). Surely, we could be using the same Apache NiFi stack as described in Chapter 10, Story De-duplication and Mutation, but we want to describe here a much easier route simply by "piping" articles URLs (extracted from GDELT records) into our Spark application through a Kafka topic.

Creating a Kafka topic

Creating a new Kafka topic is quite easy (in a test environment). Extra care must be taken on production environments by choosing the right number of partitions and replication factors. Also note that a proper zookeeper quorum must be installed and configured. We start the Kafka server and create a topic named gzet, using one partition only and a replication factor of 1:

$ kafka-server-start /usr/local/etc/kafka/server.properties > /var/log/kafka/kafka-server.log 2>&1 &

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gzet

Publishing content to a Kafka topic

We can feed the Kafka queue by piping content to the kafka-console-producer utility. We use awk, sort, and uniq commands as we are only interested in the distinct URLs from GDELT records (URL is the last field of our tab separated values, hence the $NF):

$ cat ${FILE} | awk '{print $NF}' | sort | uniq | kafka-console-producer --broker-list localhost:9092 --topic gzet

For convenience, we create a simple bash script that listens for new files on the GDELT website, downloads and extracts content to a temporary directory, and executes the preceding command. The script can be found in our code repository (gdelt-stream.sh).

Consuming Kafka from Spark Streaming

Kafka is an official source of Spark Streaming, available using the following dependency:

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
   <version>2.0.0</version>
</dependency>

We define the number of Spark partitions that will be used for processing data from the gzet topic (10 here) together with the zookeeper quorum. We return the message itself (the URLs piped to our Kafka producer) in order to build our stream of article URLs:

def createGdeltStream(ssc: StreamingContext) = {
   KafkaUtils.createStream(
     ssc,
     "localhost:2181",
     "gzet",
     Map("gzet" -> 10)
   ).values
 }

val gdeltUrlStream: DStream[String] = createGdeltStream(ssc)

Consuming Kafka from Spark Streaming
Figure 8: GDELT online layer

In the preceding figure we show how GDELT data will be processed in batches by listening to a Kafka topic. Each batch will be analyzed and the articles downloaded using the HTML parser described in Chapter 6, Scraping Link-Based External Data.

Creating a Twitter data stream

The obvious constraint of using Twitter is the constraint of scale. With over 500 million tweets a day, our application needs to be written in the most distributed and scalable way in order to handle the large amount of input data. Furthermore, if only 2% of these tweets contained a reference to an external URL, we would still have a million URLs to fetch and analyze per day (in addition to the thousands coming from GDELT). Because we do not have a dedicated architecture to handle this veracity of data for the purpose of this book, we will be using the 1% firehose provided for free by Twitter. One simply needs to register a new application on the Twitter website (https://apps.twitter.com) and retrieve both its associated application settings and authorization tokens. Note, however, that the Twitter connector is no longer part of core Spark Streaming since version 2.0.0. As part of the Apache Bahir project (http://bahir.apache.org/), it can be used with the following maven dependency:

<dependency>
   <groupId>org.apache.bahir</groupId>
   <artifactId>spark-streaming-twitter_2.11</artifactId>
   <version>2.0.0</version>
</dependency>

Because Spark Streaming uses twitter4j in the background, the configuration is done using the ConfigurationBuilder object from the twitter4j libraries:

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

def getTwitterConfiguration = {

  val builder = new ConfigurationBuilder()

  builder.setOAuthConsumerKey("XXXXXXXXXXXXXXX")
  builder.setOAuthConsumerSecret("XXXXXXXXXXXX")
  builder.setOAuthAccessToken("XXXXXXXXXXXXXXX")
  builder.setOAuthAccessTokenSecret("XXXXXXXXX")

  val configuration = builder.build()
  Some(new OAuthAuthorization(configuration))

}

We create our data stream by supplying an array of keywords (can be specific hashtags). In our case, we want to listen to all 1%, no matter the keywords or hashtags used (discovering new hashtags is actually part of our application), hence providing an empty array:

def createTwitterStream(ssc: StreamingContext) = {
   TwitterUtils.createStream(
     ssc,
     getTwitterConfiguration,
     Array[String]()
   )
}

val twitterStream: DStream[Status] = createTwitterStream(ssc)

The returned object is a stream of status, twitter4j class embedding all tweets properties such as the ones reported in the following snippet. In the scope of this application, we will only be interested in the getText method that returns the tweet body:

val body: String = status.getText()
val user: User = status.getUser()
val contributors: Array[Long] = status.getContributors()
val createdAt: Long = status.getCreatedAt()
../..
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.28.80