Spark Streaming with Kafka and HBase

Apache Kafka is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Kafka plays an important role in any streaming application. Let's see what happens without having Kafka in a streaming application. If the streaming application processing the streams is down for 1 minute for some reason, what will happen to the stream of data for that 1 minute? We will end up losing 1 minute's worth of data. Having Kafka as one more layer buffers incoming stream data and prevents any data loss. Also, if something goes wrong within the Spark Streaming application or target database, messages can be replayed from Kafka. Once the streaming application pulls a message from Kafka, acknowledgement is sent to Kafka only when data is replicated in the streaming application. This makes Kafka a reliable receiver.

There are two approaches to receive data from Kafka.

Receiver-based approach

Using the Kafka consumer API, receivers in a Spark Streaming application will receive data from Kafka partitions. As with all receivers, the data received from Kafka through a receiver is stored in Spark executors, and then jobs launched by Spark Streaming process the data. Offsets consumed by the receiver are stored in Zookeeper for tracking the progress. To ensure zero-data loss, you have to additionally enable WAL in Spark Streaming. This process is illustrated in Figure 5.5.

Receiver-based approach

Figure 5.5: Spark Streaming with Kafka with receivers

Let's go through a Kafka word count example that is receiver-based and shipped with the Spark installation. The easiest way to work with Kafka is to install and start Kafka broker. Perform the following commands to download, unzip, and start the Kafka broker:

wget http://www.trieuvan.com/apache/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz
tar xzvf kafka_2.10-0.8.2.0.tgz
cd kafka_2.10-0.8.2.0/
bin/kafka-server-start.sh config/server.properties

Open another terminal and create a topic called test with two partitions and the replication factor as 1:

bin/kafka-topics.sh --zookeeper quickstart.cloudera:2181 --topic test --create --partitions 2 --replication-factor 1

Start a Kafka console producer to start sending messages to the topic test:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Start another terminal and submit the Kafka wordcount job:

cd /usr/lib/spark/examples/lib/streaming
spark-submit --master local[*] --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.0 kafka_wordcount.py localhost:2181 test

Now, let's start entering some messages in the kafka console producer window; those should show up as wordcounts in the Kafka Streaming program.

For every message published in Kafka, it assigns an offset number. We can see the latest offset numbers with the following commands:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test -time -1 --partitions 0
test:0:18
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test -time -1 --partitions 1
test:1:16

Role of Zookeeper

In the receiver-based approach, the consumer, which is a Spark Streaming program, maintains offsets in Zookeeper. We can watch the offsets by entering the zkCli shell. Use the following commands to check the offsets of the spark-streaming-consumer group:

/usr/lib/zookeeper/bin/zkCli.sh -server localhost:2181 get /consumers/spark-streaming-consumer/offsets/test/0
/usr/lib/zookeeper/bin/zkCli.sh -server localhost:2181 get /consumers/spark-streaming-consumer/offsets/test/1

The output of the previous commands will be similar to the output shown in the following. Note that 18 is the latest offset of the Spark Streaming consumer for the partition 0. This offset number is exactly same as the offset in Kafka (see the previous result), which means that all the messages from Kafka are consumed:

WatchedEvent state:SyncConnected type:None path:null
18
cZxid = 0x44f
ctime = Sun Dec 20 15:47:14 PST 2015
mZxid = 0x4bd
mtime = Sun Dec 20 15:58:07 PST 2015
pZxid = 0x44f
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0

Let's check the lag between the offsets of Kafka and the Spark Streaming consumer using the following command:

bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic test --group spark-streaming-consumer
Group                    Topic Pid Offset logSize Lag             spark-streaming-consumer test  0    18     18      0               spark-streaming-consumer test  1    16     16      0               

Note

Note that the lag is 0 for both partitions.

Direct approach (no receivers)

This was introduced in Spark 1.3 to ensure exactly once semantics of receiving data even in case of failures. The direct approach periodically queries Kafka for the latest offsets in each topic and partition, and accordingly defines the offset ranges to process in each batch as shown in Figure 5.6.

Direct approach (no receivers)

Figure 5.6: Spark Streaming with Kafka direct approach

This approach provides the following benefits:

  • Simplified parallelism: This approach avoids creating multiple input streams and unionizing all of them. This will create a number of RDD partitions equal to the Kafka partitions to consume. This one-to-one mapping of Kafka partitions and RDD partitions makes it easier to understand and tune.
  • Efficiency: No need for WAL.
  • Exactly-once semantics: This approach eliminates inconsistencies between Zookeeper and Kafka. So each record is received by Spark Streaming effectively, exactly once despite failures in receiving.

Let's run a Kafka direct word count now. Use the same procedure as the previous to enter messages in the Kafka console producer and then start a Spark Streaming program with the following command. Note that this program takes an argument of Kafka broker while the earlier program takes an argument of Zookeeper. Offsets are not maintained within Zookeeper now:

cd /usr/lib/spark/examples/lib/streaming
spark-submit --master local[*] --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.0 direct_kafka_wordcount.py localhost:9092 test

The number of records per second is controlled by setting the parameters spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition.

Integration with HBase

Integration with HBase is fairly easy. Use newAPIHadoopRDD for reading HBase data and use saveAsNewAPIHadoopDataset for writing data to HBase. Let's go through a HBase write example by creating a table called test with column family f1. Then, run a Spark job to write data to the test table with col1 and value1:

[cloudera@quickstart lib]$ hbase shell

hbase(main):002:0> create 'test', 'f1'
0 row(s) in 0.6460 seconds

cd /usr/lib/spark/examples/lib/

spark-submit --master local[*] --driver-class-path /usr/lib/spark/lib/spark-examples.jar hbase_outputformat.py localhost test row1 f1 col1 value1

hbase(main):005:0> scan 'test'
ROW            COLUMN+CELL
row1           column=f1:col1, timestamp=1450657755249, value=value1
1 row(s) in 0.0700 seconds

Similar logic from the previous example can be applied in Spark Streaming as well. Transform the RDD into a (key,value) pair with content as (rowkey, [row key, column family, column name , value]). Then write DStream to HBase. Alternatively, you can implement foreachRDD for writing data out.

Tip

Note that the HBase native API provides put, get, scan, filter, and coprocessor methods. Hadoop provides InputFormat and OutputFormat to read and write data. The Hadoop API provides low performance while reading data from HBase rather than using the HBase native API. Using Spark SQL on HBase or Spark connector for HBase (introduced in Chapter 4, Big Data Analytics with Spark SQL, DataFrames, and Datasets) provides high performance with native HBase APIs and Spark's in-memory performance. It is the recommended approach to use Spark SQL on HBase or Spark connector for HBase for optimized performance.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.253.31