Example 3: Interceptors

There are multiple interceptors supported by Flume out of the box, and are very useful for specific scenarios. As indicated previously, the interceptors act on source events, to intercept them and add a few more details to the event as needed.

In order to better understand the usage of Interceptors, we will take the example of spooled events being streamed into kafka. We will intercept  these spooled events and add 2 more attributes which can be useful for our Data Lake:

  1. Timestamp (timestamp): Adding of timestamp to every event will help maintain the time profile of event, such that an event’s end to end timing can be tracked.
  2. UUID (eventId): Adding UUID to an event will help uniquely identify each event. Since an event is immutable, tracking event via UUID provides traceability.

This will require creating a new configuration file, ${FLUME_HOME}/conf/spool-interceptor-kafka-flume-conf.properties, having similar configurations as in the spool example, with minor changes  in source and sink configuration  with additional interceptor configuration as shown next:

  1. Source configuration changes: As shown, the source configuration is only around defining additional interceptors, namely timestamp and UUID and their respective properties:
agent.sources = spool-source
agent.sources.spool-source.type=spooldir
agent.sources.spool-source.spoolDir=/home/centos/flume-data
agent.sources.spool-source.interceptors=ts uuid

#Timestamp Interceptor Definition
agent.sources.spool-source.interceptors.ts.type=timestamp

#UUID Interceptor Definition
agent.sources.spool-source.interceptors.uuid.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent.sources.spool-source.interceptors.uuid.headerName=eventId
  1. Channel configuration: The channel configuration remains the same as in the previous example:
agent.channels = memoryChannel
agent.sources.spool-source.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
  1. Sink Configuration Changes:
# Each sink  must be defined
agent.sinks = kafkaSink
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.brokerList=localhost:9092
agent.sinks.kafkaSink.topic=spooled-intercepted
agent.sinks.kafkaSink.channel = memoryChannel
agent.sinks.kafkaSink.useFlumeEventFormat=true

As we observe the change in sink configuration is addition of the property useFlumeEventFormat so that the event headers are also captured while capturing the event body as part of the Kafka message.

In order to rerun the preceding example but with interceptors, do the following:

  1. Rename the spool file to be uncompleted and delete the Kafka logs for the spooled topic using the commands provided here:
mv ~/flume-data/spool-1.COMPLETED ~/flume-data/spool-1
  1. Change the directory to <FLUME_HOME> and run the following command to reprocess the spool file:
${FLUME_HOME}/bin/flume-ng agent --conf ${FLUME_HOME}/conf/  -f ${FLUME_HOME}/conf/spool-interceptor-kafka-flume-conf.properties  -n agent -Dflume.root.logger=INFO,console
  1. The output from the Kafka console consumer can be observed as shown next:
${KAFKA_HOME}/bin/kafka-console-consumer.sh 
${KAFKA_HOME}/config/consumer.properties --topic spooled-intercepted -bootstrap-server localhost:9092 --from-beginning

Figure 35: Kafka logs revisited again to show consumer details

As shown here, the interceptor headers have been captured as part of the message, however the consumer will need to process the message accordingly for message headers and the message body.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.216.75