Step 2 - Consuming Streams via Flink

Now that we have all 100K records queued into Kafka, the next step is to consume these messages using Flink and start establishing an execution pipeline within Flink.

Flink comes with a lot of inbuilt connectors, and one of the source connectors is the Kafka connector. In order to include the Kafka connector, the following dependency is required to be added into the project’s pom.xml file (refer to flink-example1/pom.xml):

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.10</artifactId
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId
<version>1.2.0</version>
</dependency>
Code 02: Flink Dependencies for Consumption from Kafka

In order to consume the messages from the Kafka topic, the following code can be taken as a reference. This same code is being used in our example in the com.laketravels.ch08.consumer.FlinkProcessor class, which contains the main method:

final ObjectMapper mapper = new ObjectMapper();
Properties flinkProps = PropertyLoader.loadProperty("flink.properties");
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);

DataStream<Tuple2<IntWritable, Text>> messageStream = env.addSource(
new FlinkKafkaConsumer010(
parameterTool.getRequired("topic"),
new Tuple2DeserializerSchema(),
parameterTool.getProperties()));

messageStream.rebalance().print();
Code 03: Code to consume Kafka messages from Flink Process

The main method in the FlinkProcessor class needs the following arguments for a successful launch of the execution pipeline. The parameters required for this example are:

  • topic - Contains the name of the topic from where the messages are to be consumed
  • bootstrap.servers - Contains comma separated list of ip:port of Kafka broker processes
  • zookeeper.connect - Contains the zookeeper connect address, in the form ip:port
  • group.id - Identifies the consumer group for message consumption and group level offset management of the consumer

In our example, we are passing the parameters as command line arguments as follows. Once the arguments are passed, they are decoded/interpreted by ParameterTool for substitutions within the code:

--topic customer --bootstrap.servers <KAFKA_SERVER_IP>:9092 --zookeeper.connect <ZOOKEEPER_IP>:2181 --group.id 1 --auto.offset.reset earliest

In the preceding code, the messages are being consumed using the SimpleStringSchema deserializer. This deserializer is required by FlinkKafkaConsumer to deserialize messages into the data stream.

In order to replay the messages/re-submit the job, the Flink job can be run with different Group IDs

We have now consumed the messages from Kafka and now we have to use Flink as a channel to persist into HDFS. This is explained in the next step.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.219.249.210