Kafka

Kafka is a publish-subscribe messaging system that provides a reliable Spark Streaming source. With the latest Kafka direct API, it provides one-to-one mapping between Kafka's partition and the DStream generated RDDs partition along with access to metadata and offset. Since, Kafka is an advanced streaming source as far as Spark Streaming is concerned, one needs to add its dependency in the build tool of the streaming application. The following is the artifact that should be added in the build tool of one's choice before starting with Kafka integration:

 groupId = org.apache.spark 
artifactId = spark-streaming-kafka-0-10_2.11 
version = 2.1.1 

After adding the dependency, one also needs basic information about the Kafka setup, such as the server(s) on which Kafka is hosted (bootstrap.servers) and some of the basic configurations describing the message, such as sterilizer, group ID, and so on. The following are a few common properties used to describe a Kafka connection:

  • bootstrap.servers: This describes the host and port of Kafka server(s) separated by a comma.
  • key.deserializer: This is the name of the class to deserialize the key of the messages from Kafka.
  • value.deserializer: This refers to the class that deserializes the value of the message. Usually, since the message is text and hence StringSerializer class suffice the need.
  • group.id: This uniquely identifies the group of consumer processes to which a message consumer belongs.
  • auto.offset.reset: This is used when we being to consume messages from a topic in Kafka, but does not have initial offset in Kafka or if the current offset does not exist anymore on the server then one of the following options helps Kafka strategies on deciding the offset:
    • earliest: This automatically reset the offset to the earliest offset
    • latest: This automatically reset the offset to the latest offset
    • none: This throws exception to the consumer if no previous offset is found for the consumer's

Before moving any further let's fist have an understanding as to how a basic Kafka consumer will be written in Spark Streaming. As a use case we will try and consume tweets from a Kafka streaming source and count the hashtags within each micro batch interval:

JavaSparkContext sc = new JavaSparkContext(conf); 
        JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.minutes(2)); 
        Map<String, Object> kafkaParams = new HashMap<>(); 
        kafkaParams.put("bootstrap.servers", "10.0.75.1:9092"); 
        kafkaParams.put("key.deserializer", StringDeserializer.class); 
        kafkaParams.put("value.deserializer", StringDeserializer.class); 
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_strea"); 
        kafkaParams.put("auto.offset.reset", "latest"); 
        
        Collection<String> topics = Arrays.asList("mytopic", "anothertopic"); 
SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]"); 
        final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(), 
                           ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); 
 
        JavaPairDStream<String, String> pairRDD = stream.mapToPair(record-> new Tuple2<>(record.key(), record.value())); 
        
        pairRDD.foreachRDD(pRDD-> { pRDD.foreach(tuple-> System.out.println(new Date()+" :: Kafka msg key ::"+tuple._1() +" the val is ::"+tuple._2()));}); 
        
        JavaDStream<String> tweetRDD = pairRDD.map(x-> x._2()).map(new TweetText()); 
         
        tweetRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" :: "+x))); 
         
       JavaDStream<String> hashtagRDD = tweetRDD.flatMap(twt-> Arrays.stream(twt.split(" ")).filter(str-> str.contains("#")).collect(Collectors.toList()).iterator() ); 
    
        hashtagRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(x))); 
         
        JavaPairDStream<String, Long> cntByVal = hashtagRDD.countByValue(); 
         
        cntByVal.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The count tag is ::"+x._1() +" and the val is ::"+x._2()))); 

As discussed in the previous section, for any streaming application we need a StreamingContext and the same is true for Kafka as well. After creating StreamingContext one should define the configuration parameter for the Kafka connector and group them together in a Map, similarly group all topics in a collection List. Now we use KafkaUtils helper method createDirectStream() to create a DStream object by passing StreamingContext, LocationStrategy, and ConsumerStrategy, which encompass a collection of topics and consumer configuration map. This is sufficient to consume Kafka messages as DStreams, where each message from Kafka has a key and value pair. However, we are more interested in the value part of the message that contains tweets, so we filter the tweet text and then split the word having whitespace and finally counting each word containing hashtag.

This section covered basics of streaming sources. Some aspects of consuming messages from Kafka needs more detail and understanding of the use case as well and hence is out of scope in current context. Nevertheless Kafka and Spark documentation covers in detail on how offset strategies can be custom implemented and state can be recovered from a failure using check pointing (refer to the Fault tolerance and reliability section) and other mechanisms. In the next section, we will learn about stream transformation such as stateful and stateless operations.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.163.13