Example 4 - Memory channel, file channel, and Kafka channel

So far, we have seen in all our examples the usage of the memory channel. Flume supports a few more channels, of which File channel and Kafka channel are well-known. Each of these channels is capable of connecting to source and sink in exactly the same way, however there are a few subtle differences in the way they are defined. For example, if we define a File channel, we will need to provide the location of the file and a few other file related attributes, and also ensure that the file location has sufficient permissions to be accessible.

On other hand, if we define a Kafka channel, we will need the Kafka connection URL as well as the topic name, which will act as a channel for further consumption.  Hence in this case, the topology can be Source → Kafka Channel → HDFS, as an example.

Let us replay the spool example with the preceding variations and observe how it changes the way our example works:

  1. File Channel:
    1. Copy the ${FLUME_HOME}/conf/spool-interceptor-kafka-flume-conf.properties file to ${FLUME_HOME}/conf/spool-fileChannel-kafka-flume-conf.properties and change the source, channel and sink configurations as shown next:
agent.sources = spool-source
agent.sources.spool-source.type=spooldir
agent.sources.spool-source.spoolDir=/home/centos/flume-data
agent.sources.spool-source.interceptors=ts uuid

#Timestamp Interceptor Definition
agent.sources.spool-source.interceptors.ts.type=timestamp

#UUID Interceptor Definition
agent.sources.spool-source.interceptors.uuid.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent.sources.spool-source.interceptors.uuid.headerName=eventId

# The channel can be defined as follows.
agent.channels = fileChannel
agent.channels.fileChannel.type = file
agent.channels.fileChannel.capacity = 100
agent.channels.fileChannel.transactionCapacity=10
agent.channels.fileChannel.dataDirs=/home/centos/flume-data/flume-channel/data
agent.channels.fileChannel.checkpointDir=/home/centos/flume-data/flume-channel/checkpoint
agent.sources.spool-source.channels = fileChannel

# Each sink must be defined
agent.sinks = kafkaSink
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.brokerList=localhost:9092
agent.sinks.kafkaSink.topic=spooled-fileChannel
agent.sinks.kafkaSink.channel = fileChannel
agent.sinks.kafkaSink.useFlumeEventFormat=true
  1. Rename the completed spool file to spool-1 as specified in the earlier example.
  2. Create the flume channel’s data and checkpoint directories for transactionsal and rollback needs:
mkdir  -p ~/flume-data/flume-channel/data
mkdir -p ~/flume-data/flume-channel/checkpoint
  1. Ensure that that channel capacity (in configuration, agent.channels.fileChannel.capacity = 100) is always greater than the transaction capacity (in configuration, agent.channels.fileChannel.transactionCapacity=10)
  2. Run the flume process again for spool file with the following command:
${FLUME_HOME}/bin/flume-ng agent --conf ${FLUME_HOME}/conf/  -f ${FLUME_HOME}/conf/spool-fileChannel-kafka-flume-conf.properties  -n agent -Dflume.root.logger=INFO,console
  1. Verification and Observations:
    • Verify the messages with the same Kafka console consumer command as in the previous example, all the messages must be seen exactly the same way as in the previous example.
 ${KAFKA_HOME}/bin/kafka-console-consumer.sh
${KAFKA_HOME}/config/consumer.properties --topic spooled-fileChannel -bootstrap-server localhost:9092 --from-beginning
  • Observe the flume channel data directory for the flume log created, and inspect the content of the log (~/flume-data/flume-channel/data/log-1) file as shown next:

Figure 37: Flume log

  1. Kafka Channel:
    1. Copy ${FLUME_HOME}/conf/spool-interceptor-kafka-flume-conf.properties file to ${FLUME_HOME}/conf/spool-kafkaChannel-kafka-flume-conf.properties and change the source, channel, and sink configurations as shown next:
agent.sources = spool-source
agent.sources.spool-source.type=spooldir
agent.sources.spool-source.spoolDir=/home/centos/flume-data
agent.sources.spool-source.interceptors=ts uuid

#Timestamp Interceptor Definition
agent.sources.spool-source.interceptors.ts.type=timestamp

#UUID Interceptor Definition
agent.sources.spool-source.interceptors.uuid.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent.sources.spool-source.interceptors.uuid.headerName=eventId

# The channel can be defined as follows.
agent.channels = kafkaChannel
agent.channels.kafkaChannel.type =org.apache.flume.channel.kafka.KafkaChannel
agent.channels.kafkaChannel.kafka.bootstrap.servers=localhost:9092
agent.channels.kafkaChannel.kafka.topic=datalakeChannel
agent.sources.spool-source.channels = kafkaChannel

# Each sink must be defined
agent.sinks = kafkaSink
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.brokerList=localhost:9092
agent.sinks.kafkaSink.topic=spooled
agent.sinks.kafkaSink.channel = kafkaChannel
agent.sinks.kafkaSink.useFlumeEventFormat=true
  1. Rename the completed spool file to spool-1.log and clear the Kafka logs as specified in the earlier example.
  2. Run the flume process again for the spool file with the following command:
${FLUME_HOME}/bin/flume-ng agent --conf 
${FLUME_HOME}/conf/ -f
${FLUME_HOME}/conf/spool-kafkaChannel-kafka-flume-conf.properties -n agent -Dflume.root.logger=INFO,console
  1. Verification and Observation:
    • Observe the channel-specific Kafka topic and its queue depth:
${KAFKA_HOME}/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic datalakeChannel

Figure 38: The channel-specific Kafka topic's queue depth in the log

  • Verify the messages with the same Kafka console consumer command as in the previous example; all the messages must be seen exactly the same way as in the previous example:
${KAFKA_HOME}/bin/kafka-console-consumer.sh 
${KAFKA_HOME}/config/consumer.properties --topic datalakeChannel -bootstrap-server localhost:9092 --from-beginning

As we observed here, we used Kafka as a channel; and then we again propagated the event back into a Kafka topic. This does not sound right, and for all practical cases, this will not be the case since we will be more interested in putting the events into a processing pipeline from a kafka channel or directly ingesting into the storage layers. This example was taken more as a reference to indicate a very important aspect--that Kafka can be used as a channel rather than a destination sink. This will be a big advantage since most near-real-time processing frameworks and batch frameworks would want to consume the events from Kafka as a channel.

In this chapter, we looked at Flume as an additional acquisition layer component to capture much richer customer information through various sources. Here, we have covered RDBMS as well as spool file as input, both of them aggregating the customer information into the data layer. The overall acquisition layer with Flume being added to already existing sqoop component can be visualized in the following figure:

Figure 39: Acquisition Layer with Sqoop and Flume for Single Customer View

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.11.247