Flume

Flume is an Apache open source project and product, which is designed to move large amounts of data at a big data scale. It is highly scalable, distributed, and reliable, working on the basis of data source, data sink, and data channels, as shown in the following diagram taken from http://flume.apache.org/:

Flume uses agents to process data streams. As can be seen in the previous figure, an agent has a data source, data processing channel, and data sink. A clearer way to describe this flow is via the figure we just saw. The channel acts as a queue for the sourced data and the sink passes the data to the next link in the chain.

Flume agents can form Flume architectures; the output of one agent's sink can be the input to a second agent. Apache Spark allows two approaches to using Apache Flume. The first is an Avro push-based in-memory approach, whereas the second one, still based on Avro, is a pull-based system using a custom Spark sink library. We are using Flume version 1.5 for this example:

[root@hc2nn ~]# flume-ng version
Flume 1.5.0-cdh5.3.3
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: b88ce1fd016bc873d817343779dfff6aeea07706
Compiled by jenkins on Wed Apr 8 14:57:43 PDT 2015
From source with checksum 389d91c718e03341a2367bf4ef12428e

The Flume-based Spark example that we will initially implement here is the Flume-based push approach, where Spark acts as a receiver and Flume pushes the data to Spark. The following figure represents the structure that we will implement on a single node:

The message data will be sent to port 10777 on a host called hc2r1m1 using the Linux netcat (nc) command. This will act as a source (source1) for the Flume agent (agent1), which will have an in-memory channel called channel1. The sink used by agent1 will be Apache Avro-based, again on a host called hc2r1m1, but this time, the port number will be 11777. The Apache Spark Flume application stream4 (which we will describe shortly) will listen for Flume stream data on this port.

We start the streaming process by executing the nc command against the 10777 port. Now, when we type text in this window, it will be used as a Flume source and the data will be sent to the Spark application:

[hadoop@hc2nn ~]$ nc  hc2r1m1.semtech-solutions.co.nz  10777

In order to run the Flume agent, agent1, we have created a Flume configuration file called agent1.flume.cfg, which describes the agent's source, channel, and sink. The contents of the file are as follows. The first section defines the agent1 source, channel, and sink names.

agent1.sources  = source1
agent1.channels = channel1
agent1.sinks = sink1

The next section defines source1 to be netcat-based, running on the host called hc2r1m1 and the 10777 port:

agent1.sources.source1.channels=channel1
agent1.sources.source1.type=netcat
agent1.sources.source1.bind=hc2r1m1.semtech-solutions.co.nz
agent1.sources.source1.port=10777

The agent1 channel, channel1, is defined as a memory-based channel with a maximum event capacity of 1000 events:

agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=1000

Finally, the agent1 sink, sink1, is defined as an Apache Avro sink on the host called hc2r1m1 and the 11777 port:

agent1.sinks.sink1.type=avro
agent1.sinks.sink1.hostname=hc2r1m1.semtech-solutions.co.nz
agent1.sinks.sink1.port=11777
agent1.sinks.sink1.channel=channel1

We have created a Bash script called flume.bash to run the Flume agent, agent1. It looks as follows:

[hadoop@hc2r1m1 stream]$ more flume.bash
#!/bin/bash
# run the bash agent
flume-ng agent
--conf /etc/flume-ng/conf
--conf-file ./agent1.flume.cfg
-Dflume.root.logger=DEBUG,INFO,console
-name agent1

The script calls the Flume executable flume-ng, passing the agent1 configuration file. The call specifies the agent named agent1. It also specifies the Flume configuration directory to be /etc/flume-ng/conf/, the default value. Initially, we will use a netcat Flume source with a Scala-based example to show how data can be sent to an Apache Spark application. Then, we will show how an RSS-based data feed can be processed in a similar way. So initially, the Scala code that will receive the netcat data looks like this. The application class name is defined. The necessary classes for Spark and Flume are imported. Finally, the main method is defined:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._

object stream4 {
def main(args: Array[String]) {
//The host and port name arguments for the data stream are checked and extracted:
if ( args.length < 2 ) {
System.err.println("Usage: stream4 <host> <port>")
System.exit(1)
}
val hostname = args(0).trim
val portnum = args(1).toInt
println("hostname : " + hostname)
println("portnum : " + portnum)

The Spark and Streaming contexts are created. Then, the Flume-based data stream is created using the stream context host and port number. The Flume-based class, FlumeUtils, has been used to do this by calling its createStream method:

val appName = "Stream example 4"
val conf = new SparkConf()
conf.setAppName(appName)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10) )
val rawDstream = FlumeUtils.createStream(ssc,hostname,portnum)

Finally, a stream event count is printed and (for debug purposes while we test the stream) the stream content is dumped. After this, the stream context is started and configured to run until terminated via the application:

    rawDstream.count()
.map(cnt => ">>>> Received events : " + cnt )
.print()
rawDstream.map(e => new String(e.event.getBody.array() ))
.print
ssc.start()
ssc.awaitTermination()
} // end main
} // end stream4

Having compiled it, we will run this application using spark-submit. In some of the other chapters of this book, we will use a Bash-based script called run_stream.bash to execute the job. The script looks as follows:

[hadoop@hc2r1m1 stream]$ more run_stream.bash
#!/bin/bash
SPARK_HOME=/usr/local/spark
SPARK_BIN=$SPARK_HOME/bin
SPARK_SBIN=$SPARK_HOME/sbin
JAR_PATH=/home/hadoop/spark/stream/target/scala-2.10/streaming_2.10-1.0.jar
CLASS_VAL=$1
CLASS_PARAMS="${*:2}"
STREAM_JAR=/usr/local/spark/lib/spark-examples-1.3.1-hadoop2.3.0.jar
cd $SPARK_BIN
./spark-submit
--class $CLASS_VAL
--master spark://hc2nn.semtech-solutions.co.nz:7077
--executor-memory 100M
--total-executor-cores 50
--jars $STREAM_JAR
$JAR_PATH
$CLASS_PARAMS

So, this script sets some Spark-based variables and a JAR library path for this job. It takes the Spark class to run as its first parameter. It passes all the other variables as parameters to the Spark application class job. So, the execution of the application looks as follows:

[hadoop@hc2r1m1 stream]$ ./run_stream.bash stream4 hc2r1m1 11777

This means that the Spark application is ready and is running as a Flume sink on port 11777. The Flume input is ready, running as a netcat task on port 10777. Now, the Flume agent, agent1, can be started using the Flume script called flume.bash to send the netcat source-based data to the Apache Spark Flume-based sink:

 [hadoop@hc2r1m1 stream]$ ./flume.bash

Now, when the text is passed to the netcat session, it should flow through Flume and be processed as a stream by Spark. Let's try it:

[hadoop@hc2nn ~]$ nc  hc2r1m1.semtech-solutions.co.nz 10777
I hope that Apache Spark will print this
OK
I hope that Apache Spark will print this
OK
I hope that Apache Spark will print this
OK

Three simple pieces of text have been added to the netcat session and acknowledged with an OK so that they can be passed to Flume. The debug output in the Flume session shows that the events (one per line ) have been received and processed:

2015-07-06 18:13:18,699 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:318)] Chars read = 41
2015-07-06 18:13:18,700 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:322)] Events processed = 1
2015-07-06 18:13:18,990 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:318)] Chars read = 41
2015-07-06 18:13:18,991 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:322)] Events processed = 1
2015-07-06 18:13:19,270 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:318)] Chars read = 41
2015-07-06 18:13:19,271 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:322)] Events processed = 1

Finally, in the Spark stream4 application session, three events have been received and processed; in this case, they have been dumped to the session to prove the point that the data arrived. Of course, this is not what you would normally do, but we wanted to prove data transit through this configuration:

-------------------------------------------
Time: 1436163210000 ms
-------------------------------------------
>>> Received events : 3
-------------------------------------------
Time: 1436163210000 ms
-------------------------------------------
I hope that Apache Spark will print this
I hope that Apache Spark will print this
I hope that Apache Spark will print this

 

This is interesting, but it is not really a production-worthy example of Spark Flume data processing. So, in order to demonstrate a potentially real data processing approach, we will change the Flume configuration file source details so that it uses a Perl script, which is executable as follows:

agent1.sources.source1.type=exec
agent1.sources.source.command=./rss.perl

The Perl script, which has been referenced previously, rss.perl, just acts as a source of Reuters science news. It receives the news as XML and converts it into JSON format. It also cleans the data of unwanted noise. First, it imports packages such as LWP and XML::XPath to enable XML processing. Then, it specifies a science-based Reuters news data source and creates a new LWP agent to process the data, similar to the following:

#!/usr/bin/perl
use strict;
use LWP::UserAgent;
use XML::XPath;
my $urlsource="http://feeds.reuters.com/reuters/scienceNews" ;
my $agent = LWP::UserAgent->new;
#Then an infinite while loop is opened, and an HTTP GET request is carried out against the URL. The request is configured, and the agent makes the request via a call to the request method:
while()
{
my $req = HTTP::Request->new(GET => ($urlsource));
$req->header('content-type' => 'application/json');
$req->header('Accept' => 'application/json');
my $resp = $agent->request($req);

If the request is successful, then the XML data returned is defined as the decoded content of the request. Title information is extracted from the XML via an XPath call using the path called /rss/channel/item/title:

    if ( $resp->is_success )
{
my $xmlpage = $resp -> decoded_content;
my $xp = XML::XPath->new( xml => $xmlpage );
my $nodeset = $xp->find( '/rss/channel/item/title' );
my @titles = () ;
my $index = 0 ;

For each node in the extracted title data XML string, data is extracted. It is cleaned of unwanted XML tags and added to a Perl-based array called titles:

     foreach my $node ($nodeset->get_nodelist) {
my $xmlstring = XML::XPath::XMLParser::as_string($node) ;
$xmlstring =~ s/<title>//g;
$xmlstring =~ s/</title>//g;
$xmlstring =~ s/"//g;
$xmlstring =~ s/,//g;
$titles[$index] = $xmlstring ;
$index = $index + 1 ;
} # foreach find node

The same process is carried out for description-based data in the request response XML. The XPath value used this time is /rss/channel/item/description/. There are many more tags to be cleaned from the description data, so there are many more Perl searches and line replacements that act on this data (s///g):

    my $nodeset = $xp->find( '/rss/channel/item/description' );
my @desc = () ;
$index = 0 ;
foreach my $node ($nodeset->get_nodelist) {
my $xmlstring = XML::XPath::XMLParser::as_string($node) ;
$xmlstring =~ s/<img.+/img>//g;
$xmlstring =~ s/href=".+"//g;
$xmlstring =~ s/src=".+"//g;
$xmlstring =~ s/src='.+'//g;
$xmlstring =~ s/<br.+/>//g;
$xmlstring =~ s/</div>//g;
$xmlstring =~ s/</a>//g;
$xmlstring =~ s/<a > //g;
$xmlstring =~ s/<img >//g;
$xmlstring =~ s/<img />//g;
$xmlstring =~ s/<div.+>//g;
$xmlstring =~ s/<title>//g;
$xmlstring =~ s/</title>//g;
$xmlstring =~ s/<description>//g;
$xmlstring =~ s/</description>//g;
$xmlstring =~ s/&lt;.+>//g;
$xmlstring =~ s/"//g;
$xmlstring =~ s/,//g;
$xmlstring =~ s/ | //g;
$desc[$index] = $xmlstring ;
$index = $index + 1 ;
} # foreach find node

Finally, the XML-based title and description data is output in the RSS JSON format using a print command. The script then sleeps for 30 seconds and requests more RSS news information to process:

   my $newsitems = $index ;
$index = 0 ;
for ($index=0; $index < $newsitems; $index++) {
print "{"category": "science","
. " "title": "" . $titles[$index] . "","
. " "summary": "" . $desc[$index] . """
. "} ";
} # for rss items
} # success ?
sleep(30) ;
} # while

We have created a second Scala-based stream processing code example called stream5. It is similar to the stream4 example, but it now processes the rss item data from the stream. Next, case class is defined to process the category, title, and summary from the XML RSS information. An HTML location is defined to store the resulting data that comes from the Flume channel:

case class RSSItem(category : String, title : String, summary : String) {
val now: Long = System.currentTimeMillis
val hdfsdir = "hdfs://hc2nn:8020/data/spark/flume/rss/"

The RSS stream data from the Flume-based event is converted to a string. It is then formatted using the case class called RSSItem. If there is event data, it is then written to an HDFS directory using the previous hdfsdir path:

         rawDstream.map(record => {
implicit val formats = DefaultFormats
read[RSSItem](new String(record.event.getBody().array()))
}).foreachRDD(rdd => {
if (rdd.count() > 0) {
rdd.map(item => {
implicit val formats = DefaultFormats
write(item)
}).saveAsTextFile(hdfsdir+"file_"+now.toString())
}
})

Running this code sample, it is possible to see that the Perl RSS script is producing data, because the Flume script output indicates that 80 events have been accepted and received:

2015-07-07 14:14:24,017 (agent-shutdown-hook) [DEBUG - org.apache.flume.source.ExecSource.stop(ExecSource.java:219)] Exec source with command:./news_rss_collector.py stopped. Metrics:SOURCE:source1{src.events.accepted=80, src.events.received=80, src.append.accepted=0, src.append-batch.accepted=0, src.open-connection.count=0, src.append-batch.received=0, src.append.received=0}
The Scala Spark application stream5 has processed 80 events in two batches:
>>>> Received events : 73
>>>> Received events : 7

The events have been stored on HDFS under the expected directory, as the Hadoop filesystem ls command shows here:

[hadoop@hc2r1m1 stream]$ hdfs dfs -ls /data/spark/flume/rss/
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2015-07-07 14:09 /data/spark/flume/rss/file_1436234439794
drwxr-xr-x - hadoop supergroup 0 2015-07-07 14:14 /data/spark/flume/rss/file_1436235208370

Also, using the Hadoop filesystem cat command, it is possible to prove that the files on HDFS contain rss feed news-based data, as shown here:

[hadoop@hc2r1m1 stream]$  hdfs dfs -cat /data/spark/flume/rss/file_1436235208370/part-00000 | head -1
{"category":"healthcare","title":"BRIEF-Aetna CEO says has not had specific conversations with DOJ on Humana - CNBC","summary":"* Aetna CEO Says Has Not Had Specific Conversations With Doj About Humana Acquisition - CNBC"}

This Spark stream-based example has used Apache Flume to transmit data from an rss source, through Flume, to HDFS via a Spark consumer. This is a good example, but what if you want to publish data to a group of consumers? In the next section, we will examine Apache Kafka--a publish/subscribe messaging system--and determine how it can be used with Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.17.137