Receiver-based approach

The receiver-based approach was the first integration between Spark and Kafka. In this approach, the driver starts receivers on the executors that pull data using high-level APIs, from Kafka brokers. Since receivers are pulling events from Kafka brokers, receivers update the offsets into Zookeeper, which is also used by Kafka cluster. The key aspect is the usage of a WAL (Write Ahead Log), which the receiver keeps writing to as it consumes data from Kafka. So, when there is a problem and executors or receivers are lost or restarted, the WAL can be used to recover the events and process them. Hence, this log-based design provides both durability and consistency.

Each receiver creates an input DStream of events from a Kafka topic while querying Zookeeper for the Kafka topics, brokers, offsets, and so on. After this, the discussion we had about DStreams in previous sections comes into play.

Long-running receivers make parallelism complicated as the workload is not going to be properly distributed as we scale the application. Dependence on HDFS is also a problem along with the duplication of write operations. As for the reliability needed for exactly once paradigm of processing, only the idempotent approach will work. The reason why a transactional approach, will not work in the receiver-based approach is that there is no way to access the offset ranges from the HDFS location or Zookeeper.

The receiver-based approach works with any messaging system, so it's more general purpose.

You can create a receiver-based stream by invoking the createStream() API as follows:

def createStream(
ssc: StreamingContext, // StreamingContext object
zkQuorum: String, //Zookeeper quorum (hostname:port,hostname:port,..)
groupId: String, //The group id for this consumer
topics: Map[String, Int], //Map of (topic_name to numPartitions) to
consume. Each partition is consumed in its own thread
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)
): ReceiverInputDStream[(String, String)] //DStream of (Kafka message key, Kafka message value)

Shown in the following is an example of creating a receiver-based stream that pulls messages from Kafka brokers:

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

Shown in the following is an illustration of how the driver launches receivers on executors to pull data from Kafka using the high-level API. The receivers pull the topic offset ranges from the Kafka Zookeeper cluster and then also update Zookeeper as they pull events from the brokers:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.245.219