Streaming sources

I will not be able to cover all the stream types with practical examples in this section, but where this chapter is too small to include code, I will at least provide a description. In this chapter, I will cover the TCP and file streams, and the Flume, Kafka, and Twitter streams. I will start with a practical TCP-based example.

This chapter examines stream processing architecture. For instance, what happens in cases where the stream data delivery rate exceeds the potential data processing rate? Systems like Kafka provide the possibility of solving this issue by providing the ability to use multiple data topics and consumers.

TCP stream

There is a possibility of using the Spark streaming context method called socketTextStream to stream data via TCP/IP, by specifying a hostname and a port number. The Scala-based code example in this section will receive data on port 10777 that was supplied using the netcat Linux command. The code sample starts by defining the package name, and importing Spark, the context, and the streaming classes. The object class named stream2 is defined, as it is the main method with arguments:

package nz.co.semtechsolutions

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object stream2 {

  def main(args: Array[String]) {

The number of arguments passed to the class is checked to ensure that it is the hostname and the port number. A Spark configuration object is created with an application name defined. The Spark and streaming contexts are then created. Then, a streaming batch time of 10 seconds is set:

    if ( args.length < 2 )
    {
      System.err.println("Usage: stream2 <host> <port>")
      System.exit(1)
    }

    val hostname = args(0).trim
    val portnum  = args(1).toInt

    val appName = "Stream example 2"
    val conf    = new SparkConf()

    conf.setAppName(appName)

    val sc  = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10) )

A DStream called rawDstream is created by calling the socketTextStream method of the streaming context using the host and port name parameters.

    val rawDstream = ssc.socketTextStream( hostname, portnum )

A top-ten word count is created from the raw stream data by splitting words by spacing. Then a (key,value) pair is created as (word,1), which is reduced by the key value, this being the word. So now, there is a list of words and their associated counts. Now, the key and value are swapped, so the list becomes (count and word). Then, a sort is done on the key, which is now the count. Finally, the top 10 items in the rdd, within the DStream, are taken and printed out:

    val wordCount = rawDstream
                     .flatMap(line => line.split(" "))
                     .map(word => (word,1))
                     .reduceByKey(_+_)
                     .map(item => item.swap)
                     .transform(rdd => rdd.sortByKey(false))
                     .foreachRDD( rdd =>
                       { rdd.take(10).foreach(x=>println("List : " + x)) })

The code closes with the Spark Streaming start, and awaitTermination methods being called to start the stream processing and await process termination:

    ssc.start()
    ssc.awaitTermination()

  } // end main

} // end stream2

The data for this application is provided, as I stated previously, by the Linux netcat (nc) command. The Linux cat command dumps the contents of a log file, which is piped to nc. The lk options force netcat to listen for connections, and keep on listening if the connection is lost. This example shows that the port being used is 10777:

[root@hc2nn log]# pwd
/var/log
[root@hc2nn log]# cat ./anaconda.storage.log | nc -lk 10777

The output from this TCP-based stream processing is shown here. The actual output is not as important as the method demonstrated. However, the data shows, as expected, a list of 10 log file words in descending count order. Note that the top word is empty because the stream was not filtered for empty words:

List : (17104,)
List : (2333,=)
List : (1656,:)
List : (1603,;)
List : (1557,DEBUG)
List : (564,True)
List : (495,False)
List : (411,None)
List : (356,at)
List : (335,object)

This is interesting if you want to stream data using Apache Spark streaming, based upon TCP/IP from a host and port. But what about more exotic methods? What if you wish to stream data from a messaging system, or via memory-based channels? What if you want to use some of the big data tools available today like Flume and Kafka? The next sections will examine these options, but first I will demonstrate how streams can be based upon files.

File streams

I have modified the Scala-based code example in the last section, to monitor an HDFS-based directory, by calling the Spark streaming context method called textFileStream. I will not display all of the code, given this small change. The application class is now called stream3, which takes a single parameter—the HDFS directory. The directory path could be on NFS or AWS S3 (all the code samples will be available with this book):

    val rawDstream = ssc.textFileStream( directory )

The stream processing is the same as before. The stream is split into words, and the top-ten word list is printed. The only difference this time is that the data must be put into the HDFS directory while the application is running. This is achieved with the HDFS file system put command here:

[root@hc2nn log]# hdfs dfs -put ./anaconda.storage.log /data/spark/stream

As you can see, the HDFS directory used is /data/spark/stream/, and the text-based source log file is anaconda.storage.log (under /var/log/). As expected, the same word list and count is printed:

List : (17104,)
List : (2333,=)
……..
List : (564,True)
List : (495,False)
List : (411,None)
List : (356,at)
List : (335,object)

These are simple streaming methods based on TCP, and file system data. But what if I want to use some of the built-in streaming functionality within Spark streaming? This will be examined next. The Spark streaming Flume library will be used as an example.

Flume

Flume is an Apache open source project and product, which is designed to move large amounts of data at a big data scale. It is highly scalable, distributed, and reliable, working on the basis of data source, data sink, and data channels, as the diagram here, taken from the http://flume.apache.org/ website, shows:

Flume

Flume uses agents to process data streams. As can be seen in the previous figure, an agent has a data source, a data processing channel, and a data sink. A clearer way to describe this is via the following figure. The channel acts as a queue for the sourced data and the sink passes the data to the next link in the chain.

Flume

Flume agents can form Flume architectures; the output of one agent's sink can be the input to a second agent. Apache Spark allows two approaches to using Apache Flume. The first is an Avro push-based in-memory approach, whereas the second one, still based on Avro, is a pull-based system, using a custom Spark sink library.

I installed Flume via the Cloudera CDH 5.3 cluster manager, which installs a single agent. Checking the Linux command line, I can see that Flume version 1.5 is now available:

[root@hc2nn ~]# flume-ng version
Flume 1.5.0-cdh5.3.3
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: b88ce1fd016bc873d817343779dfff6aeea07706
Compiled by jenkins on Wed Apr  8 14:57:43 PDT 2015
From source with checksum 389d91c718e03341a2367bf4ef12428e

The Flume-based Spark example that I will initially implement here, is the Flume-based push approach, where Spark acts as a receiver, and Flume pushes the data to Spark. The following figure represents the structure that I will implement on a single node:

Flume

The message data will be sent to port 10777 on a host called hc2r1m1 using the Linux netcat (nc) command. This will act as a source (source1) for the Flume agent (agent1), which will have an in-memory channel called channel1. The sink used by agent1 will be Apache Avro based, again on a host called hc2r1m1, but this time, the port number will be 11777. The Apache Spark Flume application stream4 (which I will describe shortly) will listen for Flume stream data on this port.

I start the streaming process by executing the netcat (nc) command next, against the 10777 port. Now, when I type text into this window, it will be used as a Flume source, and the data will be sent to the Spark application:

[hadoop@hc2nn ~]$ nc  hc2r1m1.semtech-solutions.co.nz  10777

In order to run my Flume agent, agent1, I have created a Flume configuration file called agent1.flume.cfg, which describes the agent's source, channel, and sink. The contents of the file are as follows. The first section defines the agent1 source, channel, and sink names.

agent1.sources  = source1
agent1.channels = channel1
agent1.sinks    = sink1

The next section defines source1 to be netcat based, running on the host called hc2r1m1, and 10777 port:

agent1.sources.source1.channels=channel1
agent1.sources.source1.type=netcat
agent1.sources.source1.bind=hc2r1m1.semtech-solutions.co.nz
agent1.sources.source1.port=10777

The agent1 channel, channel1, is defined as a memory-based channel with a maximum event capacity of 1000 events:

agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=1000

Finally, the agent1 sink, sink1, is defined as an Apache Avro sink on the host called hc2r1m1, and 11777 port:

agent1.sinks.sink1.type=avro
agent1.sinks.sink1.hostname=hc2r1m1.semtech-solutions.co.nz
agent1.sinks.sink1.port=11777
agent1.sinks.sink1.channel=channel1

I have created a Bash script called flume.bash to run the Flume agent, agent1. It looks like this:

[hadoop@hc2r1m1 stream]$ more flume.bash

#!/bin/bash

# run the bash agent

flume-ng agent 
  --conf /etc/flume-ng/conf 
  --conf-file ./agent1.flume.cfg 
  -Dflume.root.logger=DEBUG,INFO,console  
  -name agent1

The script calls the Flume executable flume-ng, passing the agent1 configuration file. The call specifies the agent named agent1. It also specifies the Flume configuration directory to be /etc/flume-ng/conf/, the default value. Initially, I will use a netcat Flume source with a Scala-based example to show how data can be sent to an Apache Spark application. Then, I will show how an RSS-based data feed can be processed in a similar way. So initially, the Scala code that will receive the netcat data looks like this. The class package name and the application class name are defined. The necessary classes for Spark and Flume are imported. Finally, the main method is defined:

package nz.co.semtechsolutions

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._

object stream4 {

  def main(args: Array[String]) {

The host and port name arguments for the data stream are checked and extracted:

    if ( args.length < 2 )
    {
      System.err.println("Usage: stream4 <host> <port>")
      System.exit(1)
    }
    val hostname = args(0).trim
    val portnum  = args(1).toInt

    println("hostname : " + hostname)
    println("portnum  : " + portnum)

The Spark and streaming contexts are created. Then, the Flume-based data stream is created using the stream context host and port number. The Flume-based class FlumeUtils has been used to do this by calling it's createStream method:

    val appName = "Stream example 4"
    val conf    = new SparkConf()

    conf.setAppName(appName)

    val sc  = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10) )

    val rawDstream = FlumeUtils.createStream(ssc,hostname,portnum)

Finally, a stream event count is printed, and (for debug purposes while we test the stream) the stream content is dumped. After this, the stream context is started and configured to run until terminated via the application:

    rawDstream.count()
         .map(cnt => ">>>> Received events : " + cnt )
         .print()

    rawDstream.map(e => new String(e.event.getBody.array() ))
              .print

    ssc.start()
    ssc.awaitTermination()

  } // end main
} // end stream4

Having compiled it, I will run this application using spark-submit. In the other chapters of this book, I will use a Bash-based script called run_stream.bash to execute the job. The script looks like this:

[hadoop@hc2r1m1 stream]$ more run_stream.bash

#!/bin/bash

SPARK_HOME=/usr/local/spark
SPARK_BIN=$SPARK_HOME/bin
SPARK_SBIN=$SPARK_HOME/sbin

JAR_PATH=/home/hadoop/spark/stream/target/scala-2.10/streaming_2.10-1.0.jar
CLASS_VAL=$1
CLASS_PARAMS="${*:2}"

STREAM_JAR=/usr/local/spark/lib/spark-examples-1.3.1-hadoop2.3.0.jar

cd $SPARK_BIN

./spark-submit 
  --class $CLASS_VAL 
  --master spark://hc2nn.semtech-solutions.co.nz:7077  
  --executor-memory 100M 
  --total-executor-cores 50 
  --jars $STREAM_JAR 
  $JAR_PATH 
  $CLASS_PARAMS

So, this script sets some Spark-based variables, and a JAR library path for this job. It takes which Spark class to run, as its first parameter. It passes all the other variables, as parameters, to the Spark application class job. So, the execution of the application looks like this:

[hadoop@hc2r1m1 stream]$ ./run_stream.bash  
                     nz.co.semtechsolutions.stream4 
                     hc2r1m1.semtech-solutions.co.nz  
                     11777

This means that the Spark application is ready, and is running as a Flume sink on port 11777. The Flume input is ready, running as a netcat task on port 10777. Now, the Flume agent, agent1, can be started using the Flume script called flume.bash to send the netcat source-based data to the Apache Spark Flume-based sink:

[hadoop@hc2r1m1 stream]$ ./flume.bash

Now, when the text is passed to the netcat session, it should flow through Flume, and be processed as a stream by Spark. Let's try it:

[hadoop@hc2nn ~]$ nc  hc2r1m1.semtech-solutions.co.nz 10777
I hope that Apache Spark will print this
OK
I hope that Apache Spark will print this
OK
I hope that Apache Spark will print this
OK

Three simple pieces of text have been added to the netcat session, and have been acknowledged with an OK, so that they can be passed to Flume. The debug output in the Flume session shows that the events (one per line ) have been received and processed:

2015-07-06 18:13:18,699 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:318)] Chars read = 41
2015-07-06 18:13:18,700 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:322)] Events processed = 1
2015-07-06 18:13:18,990 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:318)] Chars read = 41
2015-07-06 18:13:18,991 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:322)] Events processed = 1
2015-07-06 18:13:19,270 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:318)] Chars read = 41
2015-07-06 18:13:19,271 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:322)] Events processed = 1

Finally, in the Spark stream4 application session, three events have been received and processed. In this case, dumped to the session to prove the point that the data arrived. Of course, this is not what you would normally do, but I wanted to prove data transit through this configuration:

-------------------------------------------
Time: 1436163210000 ms
-------------------------------------------
>>> Received events : 3
-------------------------------------------
Time: 1436163210000 ms
-------------------------------------------
I hope that Apache Spark will print this
I hope that Apache Spark will print this
I hope that Apache Spark will print this

This is interesting, but it is not really a production-worthy example of Spark Flume data processing. So, in order to demonstrate a potentially real data processing approach, I will change the Flume configuration file source details so that it uses a Perl script, which is executable as follows:

agent1.sources.source1.type=exec
agent1.sources.source.command=./rss.perl

The Perl script, which is referenced previously, rss.perl, just acts as a source of Reuters science news. It receives the news as XML, and converts it into JSON format. It also cleans the data of unwanted noise. First, it imports packages like LWP and XML::XPath to enable XML processing. Then, it specifies a science-based Reuters news data source, and creates a new LWP agent to process the data, similar to this:

#!/usr/bin/perl

use strict;
use LWP::UserAgent;
use XML::XPath;

my $urlsource="http://feeds.reuters.com/reuters/scienceNews" ;

my  $agent = LWP::UserAgent->new;

Then an infinite while loop is opened, and an HTTP GET request is carried out against the URL. The request is configured, and the agent makes the request via a call to the request method:

while()
{
  my  $req = HTTP::Request->new(GET => ($urlsource));

  $req->header('content-type' => 'application/json');
  $req->header('Accept'       => 'application/json');

  my $resp = $agent->request($req);

If the request is successful, then the XML data returned, is defined as the decoded content of the request. Title information is extracted from the XML, via an XPath call using the path called /rss/channel/item/title:

  if ( $resp->is_success )
  {
    my $xmlpage = $resp -> decoded_content;

    my $xp = XML::XPath->new( xml => $xmlpage );
    my $nodeset = $xp->find( '/rss/channel/item/title' );

    my @titles = () ;
    my $index = 0 ;

For each node in the extracted title data title XML string, data is extracted. It is cleaned of unwanted XML tags, and added to a Perl-based array called titles:

    foreach my $node ($nodeset->get_nodelist)
    {
      my $xmlstring = XML::XPath::XMLParser::as_string($node) ;

       $xmlstring =~ s/<title>//g;
       $xmlstring =~ s/</title>//g;
       $xmlstring =~ s/"//g;
       $xmlstring =~ s/,//g;

       $titles[$index] = $xmlstring ;
       $index = $index + 1 ;

    } # foreach find node

The same process is carried out for description-based data in the request response XML. The XPath value used this time is /rss/channel/item/description/. There are many more tags to be cleaned from the description data, so there are many more Perl searches, and line replacements that act on this data (s///g):

    my $nodeset = $xp->find( '/rss/channel/item/description' );

    my @desc = () ;
    $index = 0 ;

    foreach my $node ($nodeset->get_nodelist)
    {
       my $xmlstring = XML::XPath::XMLParser::as_string($node) ;

       $xmlstring =~ s/<img.+/img>//g;
       $xmlstring =~ s/href=".+"//g;
       $xmlstring =~ s/src=".+"//g;
       $xmlstring =~ s/src='.+'//g;
       $xmlstring =~ s/<br.+/>//g;
       $xmlstring =~ s/</div>//g;
       $xmlstring =~ s/</a>//g;
       $xmlstring =~ s/<a >
//g;
       $xmlstring =~ s/<img >//g;
       $xmlstring =~ s/<img />//g;
       $xmlstring =~ s/<div.+>//g;
       $xmlstring =~ s/<title>//g;
       $xmlstring =~ s/</title>//g;
       $xmlstring =~ s/<description>//g;
       $xmlstring =~ s/</description>//g;
       $xmlstring =~ s/&lt;.+>//g;
       $xmlstring =~ s/"//g;
       $xmlstring =~ s/,//g;
       $xmlstring =~ s/
|
//g;

       $desc[$index] = $xmlstring ;
       $index = $index + 1 ;

    } # foreach find node

Finally, the XML-based title and description data is output in the RSS JSON format using a print command. The script then sleeps for 30 seconds, and requests more RSS news information to process:

    my $newsitems = $index ;
    $index = 0 ;

    for ($index=0; $index < $newsitems; $index++) {

      print "{"category": "science","
            . " "title": "" .  $titles[$index] . "","
            . " "summary": "" .  $desc[$index] . """
             . "}
";

    } # for rss items

  } # success ?

  sleep(30) ;

} # while

I have created a second Scala-based stream processing code example called stream5. It is similar to the stream4 example, but it now processes the rss item data from the stream. A case class is defined next to process the category, title, and summary from the XML rss information. An html location is defined to store the resulting data that comes from the Flume channel:

    case class RSSItem(category : String, title : String, summary : String)

    val now: Long = System.currentTimeMillis

    val hdfsdir = "hdfs://hc2nn:8020/data/spark/flume/rss/"

The rss stream data from the Flume-based event is converted into a string. It is then formatted using the case class called RSSItem. If there is event data, it is then written to an HDFS directory using the previous hdfsdir path:

        rawDstream.map(record => {
        implicit val formats = DefaultFormats
        read[RSSItem](new String(record.event.getBody().array()))
    })
         .foreachRDD(rdd => {
            if (rdd.count() > 0) {
              rdd.map(item => {
                implicit val formats = DefaultFormats
                write(item)
                  }).saveAsTextFile(hdfsdir+"file_"+now.toString())
            }
    })

Running this code sample, it is possible to see that the Perl rss script is producing data, because the Flume script output indicates that 80 events have been accepted and received:

2015-07-07 14:14:24,017 (agent-shutdown-hook) [DEBUG - org.apache.flume.source.ExecSource.stop(ExecSource.java:219)] Exec source with command:./news_rss_collector.py stopped. Metrics:SOURCE:source1{src.events.accepted=80, src.events.received=80, src.append.accepted=0, src.append-batch.accepted=0, src.open-connection.count=0, src.append-batch.received=0, src.append.received=0}

The Scala Spark application stream5 has processed 80 events in two batches:

>>>> Received events : 73
>>>> Received events : 7

And the events have been stored on HDFS, under the expected directory, as the Hadoop file system ls command shows here:

[hadoop@hc2r1m1 stream]$ hdfs dfs -ls /data/spark/flume/rss/
Found 2 items
drwxr-xr-x   - hadoop supergroup          0 2015-07-07 14:09 /data/spark/flume/rss/file_1436234439794
drwxr-xr-x   - hadoop supergroup          0 2015-07-07 14:14 /data/spark/flume/rss/file_1436235208370

Also, using the Hadoop file system cat command, it is possible to prove that the files on HDFS contain rss feed news-based data as shown here:

[hadoop@hc2r1m1 stream]$  hdfs dfs -cat /data/spark/flume/rss/file_1436235208370/part-00000 | head -1

{"category":"healthcare","title":"BRIEF-Aetna CEO says has not had specific conversations with DOJ on Humana - CNBC","summary":"* Aetna CEO Says Has Not Had Specific Conversations With Doj About Humana Acquisition - CNBC"}

This Spark stream-based example has used Apache Flume to transmit data from an rss source, through Flume, to HDFS via a Spark consumer. This is a good example, but what if you want to publish data to a group of consumers? In the next section, I will examine Apache Kafka—a publish subscribe messaging system, and determine how it can be used with Spark.

Kafka

Apache Kafka (http://kafka.apache.org/) is a top level open-source project in Apache. It is a big data publish/subscribe messaging system that is fast and highly scalable. It uses message brokers for data management, and ZooKeeper for configuration, so that data can be organized into consumer groups and topics. Data in Kafka is split into partitions. In this example, I will demonstrate a receiver-less Spark-based Kafka consumer, so that I don't need to worry about configuring Spark data partitions when compared to my Kafka data.

In order to demonstrate Kafka-based message production and consumption, I will use the Perl RSS script from the last section as a data source. The data passing into Kafka and onto Spark will be Reuters RSS news data in the JSON format.

As topic messages are created by message producers, they are then placed in partitions in message order sequence. The messages in the partitions are retained for a configurable time period. Kafka then stores the offset value for each consumer, which is that consumer's position (in terms of message consumption) in that partition.

I am currently using Cloudera's CDH 5.3 Hadoop cluster. In order to install Kafka, I need to download a Kafka JAR library file from: http://archive.cloudera.com/csds/kafka/.

Having downloaded the file, and given that I am using CDH cluster manager, I then need to copy the file to the /opt/cloudera/csd/ directory on my NameNode CentOS server, so that it will be visible to install:

[root@hc2nn csd]# pwd
/opt/cloudera/csd

[root@hc2nn csd]# ls -l KAFKA-1.2.0.jar
-rw-r--r-- 1 hadoop hadoop 5670 Jul 11 14:56 KAFKA-1.2.0.jar

I then need to restart the Cloudera cluster manager server on my NameNode, or master server, so that the change will be recognized. This was done as root using the service command, which is as follows:

[root@hc2nn hadoop]# service cloudera-scm-server restart
Stopping cloudera-scm-server:                              [  OK  ]
Starting cloudera-scm-server:                              [  OK  ]

Now, the Kafka parcel should be visible within the CDH manager under Hosts | Parcels, as shown in the following figure. You can follow the usual download, distribution, and activate cycle for the CDH parcel installation:

Kafka

I have installed Kafka message brokers on each Data Node, or Spark Slave machine in my cluster. I then set the Kafka broker ID values for each Kafka broker server, giving them a broker.id number of 1 through 4. As Kafka uses ZooKeeper for cluster data configuration, I wanted to keep all the Kafka data in a top level node called kafka in ZooKeeper. In order to do this, I set the Kafka ZooKeeper root value, called zookeeper.chroot, to /kafka. After making these changes, I restarted the CDH Kafka servers for the changes to take effect.

With Kafka installed, I can check the scripts available for testing. The following listing shows Kafka-based scripts for message producers and consumers, as well as scripts for managing topics, and checking consumer offsets. These scripts will be used in this section in order to demonstrate Kafka functionality:

[hadoop@hc2nn ~]$ ls /usr/bin/kafka*

/usr/bin/kafka-console-consumer         /usr/bin/kafka-run-class
/usr/bin/kafka-console-producer         /usr/bin/kafka-topics
/usr/bin/kafka-consumer-offset-checker

In order to run the installed Kafka servers, I need to have the broker server ID's (broker.id) values set, else an error will occur. Once Kafka is installed and running, I will need to prepare a message producer script. The simple Bash script given next, called kafka.bash, defines a comma-separated broker list of hosts and ports. It also defines a topic called rss. It then calls the Perl script rss.perl to generate the RSS-based data. This data is then piped into the Kafka producer script called kafka-console-producer to be sent to Kafka.

[hadoop@hc2r1m1 stream]$ more kafka.bash

#!/bin/bash

BROKER_LIST="hc2r1m1:9092,hc2r1m2:9092,hc2r1m3:9092,hc2r1m4:9092"
TOPIC="rss"

./rss.perl | /usr/bin/kafka-console-producer --broker-list $BROKER_LIST --topic $TOPIC

Notice that I have not mentioned Kafka topics at this point. When a topic is created in Kafka, the number of partitions can be specified. In the following example, the kafka-topics script has been called with the create option. The number of partitions have been set to 5, and the data replication factor has been set to 3. The ZooKeeper server string has been defined as hc2r1m2-4 with a port number of 2181. Also note that the top level ZooKeeper Kafka node has been defined as /kafka in the ZooKeeper string:

/usr/bin/kafka-topics 
  --create  
  --zookeeper hc2r1m2:2181,hc2r1m3:2181,hc2r1m4:2181/kafka 
  --replication-factor 3  
  --partitions 5  
  --topic rss

I have also created a Bash script called kafka_list.bash for use during testing, which checks all the Kafka topics that have been created, and also the Kafka consumer offsets. It calls the kafka-topics commands with a list option, and a ZooKeeper string to get a list of created topics. It then calls the Kafka script called kafka-consumer-offset-checker with a ZooKeeper string—the topic name and a group name to get a list of consumer offset values. Using this script, I can check that my topics are created, and the topic data is being consumed correctly:

[hadoop@hc2r1m1 stream]$ cat kafka_list.bash

#!/bin/bash

ZOOKEEPER="hc2r1m2:2181,hc2r1m3:2181,hc2r1m4:2181/kafka"
TOPIC="rss"
GROUP="group1"

echo ""
echo "================================"
echo " Kafka Topics "
echo "================================"

/usr/bin/kafka-topics --list --zookeeper $ZOOKEEPER

echo ""
echo "================================"
echo " Kafka Offsets "
echo "================================"
/usr/bin/kafka-consumer-offset-checker 
  --group $GROUP 
  --topic $TOPIC 
  --zookeeper $ZOOKEEPER

Next, I need to create the Apache Spark Scala-based Kafka consumer code. As I said, I will create a receiver-less example, so that the Kafka data partitions match in both, Kafka and Spark. The example is called stream6. First, the package is defined, and the classes are imported for Kafka, spark, context, and streaming. Then, the object class called stream6, and the main method are defined. The code looks like this:

package nz.co.semtechsolutions

import kafka.serializer.StringDecoder

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

object stream6 {

  def main(args: Array[String]) {

Next, the class parameters (broker's string, group ID, and topic) are checked and processed. If the class parameters are incorrect, then an error is printed, and execution stops, else the parameter variables are defined:

    if ( args.length < 3 )
    {
      System.err.println("Usage: stream6 <brokers> <groupid> <topics>
")
      System.err.println("<brokers> = host1:port1,host2:port2
")
      System.err.println("<groupid> = group1
")
      System.err.println("<topics>  = topic1,topic2
")
      System.exit(1)
    }

    val brokers = args(0).trim
    val groupid = args(1).trim
    val topics  = args(2).trim

    println("brokers : " + brokers)
    println("groupid : " + groupid)
    println("topics  : " + topics)

The Spark context is defined in terms of an application name. Again the Spark URL has been left as the default. The streaming context has been created using the Spark context. I have left the stream batch interval at 10 seconds, which is the same as the last example. However, you can set it using a parameter of your choice:

    val appName = "Stream example 6"
    val conf    = new SparkConf()

    conf.setAppName(appName)

    val sc  = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10) )

Next, the broker list and group ID are set up as parameters. These values are then used to create a Kafka-based Spark stream called rawDStream:

    val topicsSet = topics.split(",").toSet
    val kafkaParams : Map[String, String] =
        Map("metadata.broker.list" -> brokers,
            "group.id" -> groupid )

    val rawDstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

I have again printed the stream event count for debug purposes, so that I know when the application is receiving and processing the data:

    rawDstream.count().map(cnt => ">>>>>>>>>>>>>>> Received events : " + cnt ).print()

The HDSF location for the Kafka data has been defined as /data/spark/kafka/rss/. It has been mapped from the DStream into the variable lines. Using the foreachRDD method, a check on the data count is carried out on the lines variable, before saving the data into HDFS using the saveAsTextFile method:

    val now: Long = System.currentTimeMillis

    val hdfsdir = "hdfs://hc2nn:8020/data/spark/kafka/rss/"

    val lines = rawDstream.map(record => record._2)

    lines.foreachRDD(rdd => {
            if (rdd.count() > 0) {
              rdd.saveAsTextFile(hdfsdir+"file_"+now.toString())
            }
    })

Finally, the Scala script closes by starting the stream processing, and setting the application class to run until terminated with awaitTermination:

    ssc.start()
    ssc.awaitTermination()

  } // end main

} // end stream6

With all of the scripts explained and the Kafka CDH brokers running, it is time to examine the Kafka configuration, which if you remember is maintained by Apache ZooKeeper (all of the code samples that have been described so far will be released with the book). I will use the zookeeper-client tool, and connect to the zookeeper server on the host called hc2r1m2 on the 2181 port. As you can see here, I have received a connected message from the client session:

[hadoop@hc2r1m1 stream]$ /usr/bin/zookeeper-client -server hc2r1m2:2181


[zk: hc2r1m2:2181(CONNECTED) 0]

If you remember, I specified the top level ZooKeeper directory for Kafka to be /kafka. If I examine this now via a client session, I can see the Kafka ZooKeeper structure. I will be interested in brokers (the CDH Kafka broker servers), and consumers (the previous Spark Scala code). The ZooKeeper ls commands show that the four Kafka servers have registered with ZooKeeper, and are listed by their broker.id configuration values one to four:

[zk: hc2r1m2:2181(CONNECTED) 2] ls /kafka
[consumers, config, controller, admin, brokers, controller_epoch]

[zk: hc2r1m2:2181(CONNECTED) 3] ls /kafka/brokers
[topics, ids]

[zk: hc2r1m2:2181(CONNECTED) 4] ls /kafka/brokers/ids
[3, 2, 1, 4]

I will create the topic that I want to use for this test using the Kafka script kafka-topics with a create flag. I do this manually, because I can demonstrate the definition of the data partitions while I do it. Note that I have set the partitions in the Kafka topic rss to five as shown in the following piece of code. Note also that the ZooKeeper connection string for the command has a comma-separated list of ZooKeeper servers, terminated by the top level ZooKeeper Kafka directory called /kafka. This means that the command puts the new topic in the proper place:

[hadoop@hc2nn ~]$ /usr/bin/kafka-topics 
>   --create  
>   --zookeeper hc2r1m2:2181,hc2r1m3:2181,hc2r1m4:2181/kafka 
>   --replication-factor 3  
>   --partitions 5  
>   --topic rss

Created topic "rss".

Now, when I use the ZooKeeper client to check the Kafka topic configuration, I can see the correct topic name, and the expected number of the partitions:

[zk: hc2r1m2:2181(CONNECTED) 5] ls /kafka/brokers/topics
[rss]

[zk: hc2r1m2:2181(CONNECTED) 6] ls /kafka/brokers/topics/rss
[partitions]

[zk: hc2r1m2:2181(CONNECTED) 7] ls /kafka/brokers/topics/rss/partitions
[3, 2, 1, 0, 4]

This describes the configuration for the Kafka broker servers in ZooKeeper, but what about the data consumers? Well, the following listing shows where the data will be held. Remember though, at this time, there is no consumer running, so it is not represented in ZooKeeper:

[zk: hc2r1m2:2181(CONNECTED) 9]  ls /kafka/consumers
[]
[zk: hc2r1m2:2181(CONNECTED) 10] quit

In order to start this test, I will run my Kafka data producer, and consumer scripts. I will also check the output of the Spark application class and need to check the Kafka partition offsets and HDFS to make sure that the data has arrived. This is quite complicated, so I will add a diagram here in the following figure to explain the test architecture.

The Perl script called rss.perl will be used to provide a data source for a Kafka data producer, which will feed data into the CDH Kafka broker servers. The data will be stored in ZooKeeper, in the structure that has just been examined, under the top level node called /kafka. The Apache Spark Scala-based application will then act as a Kafka consumer, and read the data that it will store under HDFS.

Kafka

In order to try and explain the complexity here, I will also examine my method of running the Apache Spark class. It will be started via the spark-submit command. Remember again that all of these scripts will be released with this book, so that you can examine them in your own time. I always use scripts for server test management, so that I encapsulate complexity, and command execution is quickly repeatable. The script, run_stream.bash, is like many example scripts that have already been used in this chapter, and this book. It accepts a class name and the class parameters, and runs the class via spark-submit:

[hadoop@hc2r1m1 stream]$ more run_stream.bash

#!/bin/bash

SPARK_HOME=/usr/local/spark
SPARK_BIN=$SPARK_HOME/bin
SPARK_SBIN=$SPARK_HOME/sbin

JAR_PATH=/home/hadoop/spark/stream/target/scala-2.10/streaming_2.10-1.0.jar
CLASS_VAL=$1
CLASS_PARAMS="${*:2}"

STREAM_JAR=/usr/local/spark/lib/spark-examples-1.3.1-hadoop2.3.0.jar
cd $SPARK_BIN

./spark-submit 
  --class $CLASS_VAL 
  --master spark://hc2nn.semtech-solutions.co.nz:7077  
  --executor-memory 100M 
  --total-executor-cores 50 
  --jars $STREAM_JAR 
  $JAR_PATH 
  $CLASS_PARAMS

I then used a second script, which calls the run_kafka_example.bash script to execute the Kafka consumer code in the previous stream6 application class. Note that this script sets up the full application class name—the broker server list. It also sets up the topic name, called rss, to use for data consumption. Finally, it defines a consumer group called group1. Remember that Kafka is a publish/subscribe message brokering system. There may be many producers and consumers organized by topic, group, and partition:

[hadoop@hc2r1m1 stream]$ more run_kafka_example.bash

#!/bin/bash

RUN_CLASS=nz.co.semtechsolutions.stream6
BROKERS="hc2r1m1:9092,hc2r1m2:9092,hc2r1m3:9092,hc2r1m4:9092"
GROUPID=group1
TOPICS=rss

# run the Apache Spark Kafka example

./run_stream.bash $RUN_CLASS 
                  $BROKERS 
                  $GROUPID 
                  $TOPICS

So, I will start the Kafka consumer by running the run_kafka_example.bash script, which in turn will run the previous stream6 Scala code using spark-submit. While monitoring Kafka data consumption using the script called kafka_list.bash, I was able to get the kafka-consumer-offset-checker script to list the Kafka-based topics, but for some reason, it will not check the correct path (under /kafka in ZooKeeper) when checking the offsets as shown here:

[hadoop@hc2r1m1 stream]$ ./kafka_list.bash

================================
 Kafka Topics
================================
__consumer_offsets
rss

================================
 Kafka Offsets
================================
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/group1/offsets/rss/4.

By starting the Kafka producer rss feed using the script kafka.bash, I can now start feeding the rss-based data through Kafka into Spark, and then into HDFS. Periodically checking the spark-submit session output it can be seen that events are passing through the Spark-based Kafka DStream. The following output comes from the stream count in the Scala code, and shows that at that point, 28 events were processed:

-------------------------------------------
Time: 1436834440000 ms
-------------------------------------------
>>>>>>>>>>>>>>> Received events : 28

By checking HDFS under the /data/spark/kafka/rss/ directory, via the Hadoop file system ls command, it can be seen that there is now data stored on HDFS:

[hadoop@hc2r1m1 stream]$ hdfs dfs -ls /data/spark/kafka/rss
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2015-07-14 12:40 /data/spark/kafka/rss/file_1436833769907

By checking the contents of this directory, it can be seen that an HDFS part data file exists, which should contain the RSS-based data from Reuters:

[hadoop@hc2r1m1 stream]$ hdfs dfs -ls /data/spark/kafka/rss/file_1436833769907
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2015-07-14 12:40 /data/spark/kafka/rss/file_1436833769907/_SUCCESS
-rw-r--r--   3 hadoop supergroup       8205 2015-07-14 12:40 /data/spark/kafka/rss/file_1436833769907/part-00001

Using the Hadoop file system cat command below, I can dump the contents of this HDFS-based file to check its contents. I have used the Linux head command to limit the data to save space. Clearly this is RSS Reuters science based information that the Perl script rss.perl has converted from XML to RSS JSON format.

[hadoop@hc2r1m1 stream]$ hdfs dfs -cat /data/spark/kafka/rss/file_1436833769907/part-00001 | head -2

{"category": "science", "title": "Bear necessities: low metabolism lets pandas survive on bamboo", "summary": "WASHINGTON (Reuters) - Giant pandas eat vegetables even though their bodies are better equipped to eat meat. So how do these black-and-white bears from the remote misty mountains of central China survive on a diet almost exclusively of a low-nutrient food like bamboo?"}

{"category": "science", "title": "PlanetiQ tests sensor for commercial weather satellites", "summary": "CAPE CANAVERAL (Reuters) - PlanetiQ a privately owned company is beginning a key test intended to pave the way for the first commercial weather satellites."}

This ends this Kafka example. It can be seen that Kafka brokers have been installed and configured. It shows that an RSS data-based Kafka producer has fed data into the brokers. It has been proved, using the ZooKeeper client, that the Kafka architecture, matching the brokers, topics, and partitions has been set up in ZooKeeper. Finally, it has been shown using the Apache Spark-based Scala code, in the stream6 application, that the Kafka data has been consumed and saved to HDFS.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.208.12