Chapter 3. Sources

Sources are the components responsible for accepting data into a Flume agent. Sources can accept data from other systems, like the Java Message Service (JMS), or the output of other processes. Sources are also used to receive data from other Flume agents whose sinks send data via RPC. There are even sources that can produce data. It is possible to write sources to accept data from pretty much anything!

The data sources receive from an external system or from other agents (or produce by themselves) is then written out to one or more channels configured for the source. This is the basic responsibility of a source.

In this chapter, we will discuss the design and working of various sources that come packaged with Flume and how to configure them optimally for use; we will also look at how to write a custom source.

Lifecycle of a Source

Sources are named components that are configured like any other component through the configuration file. Flume’s configuration system validates each source’s configuration and discards sources that are incorrectly configured. The validation done by the configuration system is pretty minimal, though. The Flume configuration system ensures that:

  • Each source has at least one properly configured channel “connected” to it.

  • Each source has a type parameter defined.

  • The source is in the active list of sources for the agent.

Once the configuration system approves a source, it is then instantiated and configured by the ConfigurationProvider. If the source is misconfigured or is missing required parameters, this source is discarded. Once the source is successfully configured, Flume’s lifecycle management system will attempt to start the source. The source is then stopped only if the agent itself is stopped or killed, or if the agent is reconfigured by the user.

Sources, like all Flume components, require their type to be specified in the configuration. This can be the fully qualified class name (FQCN), or the alias for built-in sources. All sources require at least one properly configured channel to write to. Therefore, a list of channels is also a mandatory parameter for the source to be considered properly configured. The required parameters are shown in Table 3-1.

Table 3-1. Mandatory configuration parameters for all sources
Parameter Description

type

The type of the source. This can be the FQCN or the alias of the source (only for sources that are part of Flume itself). The class must be installed using the plugins.d framework described in “Deploying Custom Code”. The alias for each of Flume’s built-in sources is mentioned in the relevant section.

channels

A space-separated list of channels the source should write events to. For all channels selected for writes by the channel selector based on the event and the routing parameters, the source will write events to the channels in the order specified in the configuration file. More details on this are in “Channel Selectors”.

A source named usingFlumeSource of type avro, running in an agent started with the name usingFlume, would be configured with a file that looks like:

usingFlume.sources = usingFlumeSource
usingFlume.channels = memory

usingFlume.sources.usingFlumeSource.type = avro
usingFlume.sources.usingFlumeSource.channels = memory
usingFlume.sources.usingFlumeSource.port = 7877
usingFlume.sources.usingFlumeSource.bind = 0.0.0.0

There are a few parameters that can optionally be passed to all sources. These are meant to configure interceptors and channel selectors for the source. They are shown in Table 3-2. These parameters are passed to the source just as any other configuration parameters are passed in.

Table 3-2. Optional configuration parameters for all sources
Parameter Description

interceptors

A list of names that represent a chain of interceptors

interceptors.<interceptor_name>.*

Parameters to pass to the interceptor with the specific name

selector

The alias or FQCN of the channel selector to use; if no selectors are specified, the replicating channel selector is used

selector.*

Configuration parameters to pass to the channel selector

An example configuration for two interceptors for a source named avro is shown in Example 3-1. Two interceptors, host and static (named i1 and i2, respectively), are configured to intercept events being received by the source. As you can see, interceptors can accept configuration with the interceptors.<interceptor_name>. prefix.

Example 3-1. Configuring interceptors
agent.sources.avro.interceptors = i1 i2
agent.sources.avro.interceptors.i1.type = host
agent.sources.avro.interceptors.i1.preserveExsiting = true
agent.sources.avro.interceptors.i2.type = static
agent.sources.avro.interceptors.i2.key = header
agent.sources.avro.interceptors.i2.value = staticValue

Each source has exactly one channel selector (which is why it is not a named component and can be configured using the selector configuration suffix). Though the channel selector configuration looks like the configuration of a source subcomponent, the source does not need to configure the selector—this is done by the configuration system. In Example 3-2, the channel processor for the source named avro configures a multiplexing channel selector to bifurcate the flow of events from the source. We will discuss the specific configuration parameters for a multiplexing channel selector in Chapter 6, but as this example demonstrates, selectors can accept configuration parameters based on which they can return the channels to which the source should write specific events.

Example 3-2. Channel selector configuration
agent.sources.avro.selector.type = multiplexing
agent.sources.avro.selector.header = priority
agent.sources.avro.selector.mapping.1 = channel1
agent.sources.avro.selector.mapping.2 = channel2
agent.sources.avro.selector.default = channel2

If an agent is reconfigured, the same instance of the source class will not be reused. Therefore, all sources packaged with Flume are stateless. It is expected that any custom sources that are plugged into Flume are also stateless to avoid data loss.

Now that we have covered the basic concepts of sources, let’s discuss the various sources that come packaged with Flume.

Sink-to-Source Communication

One of the most important features of Flume is the ease of horizontally scaling a Flume deployment. The reason this can be done easily is that it’s trivial to add new agents to a Flume deployment and configure them to send data to other Flume agents. Similarly, once a new agent is added, it is fairly easy to configure the agents that are already running to write to this agent by simply updating the configuration file.

At the center of this flexibility is Flume’s RPC sink–source combination. As discussed in “Getting Flume Agents to Talk to Each Other”, RPC sinks are designed to send events to RPC sources—Thrift Sink to Thrift Source and Avro Sink to Avro Source. RPC sinks and sources are highly scalable, with the sources being able to receive data from a large number of sinks or RPC clients. Even though each RPC sink can send data only to one RPC source, each agent can be configured to send data to multiple other agents using sink groups and sink processors; see “Sink Groups and Sink Processors”.

In this section, we will discuss Flume’s RPC sources and the various aspects of configuring and deploying them.

Avro Source

Flume’s primary RPC source is the Avro Source. The Avro Source is designed to be a highly scalable RPC server that accepts data into a Flume agent, from another Flume agent’s Avro Sink or from a client application that uses Flume’s SDK to send data. The Avro Source together with the Avro Sink represents Flume’s internal communication mechanism (between Flume agents). With the scalability of the Avro Source combined with the channels that act as a buffer, Flume agents can handle significant load spikes.

Flume’s Avro Source uses the Netty-Avro inter-process communication (IPC) protocol to communicate. As a result, it is possible to send data to the Avro Source from Java or JVM languages. To send events from your application to an agent with an Avro Source, you can make use of the Flume SDK (see “Flume Client SDK”) or the embedded agent (see “Embedded Agent”).

An Avro Source can be configured to accept compressed events from an Avro Sink that is configured to output them. It can also be configured to make sure that any clients or sinks sending data to it encrypt the data using SSL. An Avro Source’s configuration parameters are detailed in Table 3-3.

Table 3-3. Avro Source configuration
Config parameter Default value Description

type

-

The alias for an Avro Source is avro. The FQCN, which is org.apache.flume.source.AvroSource (case sensitive), can also be used.

bind

-

The IP address/hostname to bind to. To bind to all interfaces on the machine, use 0.0.0.0.

port

-

The port to bind to.

threads

infinity

The maxmium number of worker threads to accept incoming data from clients/Avro Sinks.

ssl

false

Should SSL be enabled? If this is set to true, all clients connecting to this source are required to use SSL. If SSL is enabled, the keystore and keystore-password parameters are required.

keystore

-

The path to the keystore to use for SSL. This is a required parameter if SSL is enabled.

keystore-password

-

The password to be used to open the keystore. This is a required parameter if SSL is enabled.

keystore-type

JKS

The type of keystore that is being used.

compression-type

-

The compression format used to decompress the incoming data. The only compression format supported is zlib [zlib_ch3]. To accept zlib-compressed data, set this parameter to deflate.

Configuring the Avro Source in the simplest way requires a minimal set of parameters. Minimally, the source requires two mandatory parameters other than the type parameter itself: bind and port. These two parameters define the socket address that the source uses. If there are multiple network interfaces, the Avro Source can bind to one or all of them. To bind to just one of the interfaces, simply set the IP address/domain address of that interface as the value of the bind parameter. To bind to all interfaces, use 0.0.0.0 as the value of the bind parameter. The port parameter defines the port number the source should listen on, for the configured bind address(es).

The Avro Source uses a Netty server [netty] to serve incoming requests. The Netty server uses Java’s nonblocking I/O (NIO) [nio], which allows it to be highly performant while using a relatively small number of threads to handle the requests. The Avro Source allows the user to configure the maximum number of threads that the source should use to handle incoming data using the threads parameter. This allows the user to keep a check on the resources consumed by the source. Though there is no theoretical maximum on the number of threads, the actual number is limited by the JVM, the OS, and the hardware.

SSL Keystores

A keystore is a collection of cryptographic keys and certificates, as defined by the Java standard [keystore]. Each keystore is protected by a password, which can be used load the keystore. In Flume’s case, this password is stored in the Flume configuration file as plain text; the configuration file must be guarded by the correct permissions to avoid this password falling into the wrong hands.

If the Avro Sink(s) or RPC clients sending data to Flume are configured to use SSL to send the events to the Avro Source, the Avro Source must be configured with the SSL-related parameters. The ssl parameter must be set to true and the keystore and keystore-password parameters must be set. The keystore parameter must point to a valid keystore file, and keystore-password is the password that is to be used to open the keystore.

The keystore-type parameter is optional and can be set to an alternate keystore type, if needed [keystore-type]. The cryptographic algorithm used is defined by ssl.KeyManagerFactory.algorithm in the Java security properties file. If this parameter is not set in the Java security properties file, then the SunX509 algorithm is used. More details about the Java security properties file can be found in the Java Security Guide [java-security].

Avro Sinks and Flume’s RPC clients can be configured to compress data before sending it to the Avro Source. This is especially useful if the data is sent over a WAN, between data centers, etc. to reduce bandwidth usage. Currently, the Avro Source only supports zlib compression for RPC. To enable the Avro Source to receive data in compressed form, set the compression-type parameter to deflate. If this parameter is not set or is set to none, Flume will not attempt to decompress the data; this might cause the events to get backlogged at the previous hop, since the source will not be able to parse the compressed data and will return an error to the previous hop, causing that hop to retry forever.

Avro Sources and Compression

If the compression-type parameter is set to deflate, the incoming data must be compressed, or else the source will not be able to parse the incoming data. The sink or Flume client sending the data must be configured to compress the data being sent. Therefore, if both compressed and uncompressed data will be sent to the same Flume agent, the agent should run two Avro Sources, one for receiving compressed data and another for receiving uncompressed data.

Also note that when the Avro Sink/Flume RPC client sends data to the Avro Source, it compresses the data batch by batch, not event by event, since this may provide a better compression ratio and reduce the CPU usage for compression.

Here is an example of an Avro Source configured with SSL and compression. To disable SSL, simply remove the ssl parameter, and the remaining SSL-related parameters will be ignored:

agent.sources = avroSrc
agent.channels = memChannel

agent.sources.avroSrc.type = avro
agent.sources.avroSrc.channels = memChannel

# Bind to all interfaces
agent.sources.avroSrc.bind = 0.0.0.0
agent.sources.avroSrc.port = 4353

# Removing the next line will disable SSL
agent.sources.avroSrc.ssl = true
agent.sources.avroSrc.keystore = /tmp/keystore.jks
agent.sources.avroSrc.keystore-password = UsingFlume
agent.sources.avroSrc.keystore-type = jks

agent.sources.avroSrc.compression-type = deflate

# Initializes a memory channel with default configuration
agent.channels.memChannel.type = memory

An Avro Sink that writes to this source would have a configuration similar to the following:

agent.channels = avroSinkChannel
agent.sinks = avroSink

agent.channels.avroSinkChannel.type = memory

agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memory
agent.sinks.avroSink.hostname = avrosrchost.example.com
agent.sinks.avroSink.port = 4353

# SSL properties
agent.sinks.avroSink.ssl = true
agent.sinks.avroSink.trust-all-certs = true
agent.sinks.avroSink.truststore = /path/to/keystore
agent.sinks.avroSink.truststore-password = UsingFlume
agent.sinks.avroSink.truststore-type = JKS

agent.sources.avroSink.compression-type = deflate

Thrift Source

As mentioned in “Avro Source”, Flume’s use of Avro’s Java-specific RPC mechanism makes the Avro Source unable to accept data from non-JVM languages. As Flume became more popular, this use case had to be addressed. Therefore, Apache Thrift RPC support [thrift_ch3] was added to Flume. Thrift is a top-level project at the Apache Software Foundation that enables cross-language communication, which is extremely popular. The Thrift Sink–Thrift Source combination in Flume is designed to work pretty much exactly like the Avro Sink–Avro Source combination. Flume also has a Java Thrift RPC client that is part of the Flume SDK. The Thrift Source, in the simplest terms, is a multithreaded high-performance Thrift server. The Thrift interface definition language (IDL) that Flume uses is shown here:

namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}

enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}

service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),
}

This IDL can be used to generate Thrift clients in any language that Thrift supports. The generated code can then be used to send data to Flume’s Thrift Source.

The configuration of the Thrift Source is extremely simple and mimics that of the Avro Source (see Table 3-4).

Table 3-4. Thrift Source configuration
Config parameter Default value Description

type

-

The alias for a Thrift Source is thrift. The FQCN, which is org.apache.flume.source.ThriftSource (case sensitive), can also be used.

bind

-

The IP address/hostname to bind to. To bind to all interfaces on the machine, use 0.0.0.0.

port

-

The port to bind to.

threads

-

The maximum number of threads this source should use for processing requests.

The bind parameter specifies the hostname/IP address of the interface to bind to; use 0.0.0.0 to bind to all interfaces. The port parameter specifies the port to use—this is the port clients would use to send events to this source. These are both mandatory parameters.

The threads parameter for a Thrift Source works in a slightly different way than for the Avro Source. Flume (as of version 1.4.0), by default, is built against and includes Thrift version 0.7.0. This was meant to support clients (programs written to use the Flume SDK) that would also write to HBase version 0.92 (or older) from the same process. If there is no requirement to support this version of HBase, then it is recommended that the Thrift version that comes with Flume be replaced with a newer version, though due to incompatibilities in the Thrift-generated code, Flume may also need to be recompiled against the newer version.

When using a version of Thrift lower than 0.8.0, Flume uses Thrift’s TThreadPoolServer, which uses one thread per client connected, and the threads parameter controls the maximum number of threads that the source will create, thus indirectly controlling the number of clients that can connect to the agent. It is recommended to not set this parameter in this case. If a newer version of Thrift is being used, then Flume uses Thrift’s TThreadedSelectorServer, which uses Java’s nonblocking I/O and therefore can support more clients than there are threads available. In this case, the threads parameter works just like the Avro Source’s threads parameter and can be used to keep the resource utilization under control.

The Thrift Source, unlike the Avro Source, does not currently support compression or SSL. A Thrift Source should therefore be used only to push data into Flume from systems that are written in non-JVM languages, or if the application that is writing the data already uses Thrift for other purposes. For Flume agent–to–Flume agent communication, it is recommended that the Avro Sink–Avro Source pair be used.

The following is an example of configuration of a Thrift Source:

agent.sources = thriftSrc
agent.channels = memChannel

agent.sources.thriftSrc.type = thrift
agent.sources.thriftSrc.channels = memChannel

# Bind to all interfaces
agent.sources.thriftSrc.bind = 0.0.0.0
agent.sources.thriftSrc.port = 4564

# Initializes a memory channel with default configuration
agent.channels.memChannel.type = memory

A Thrift Sink that writes to this source would have a configuration similar to the following:

agent.channels = memChannel
agent.sinks = thriftSink

agent.channels.memChannel.type = memory

agent.sinks.thriftSink.type = thrift
agent.sinks.thriftSink.channel = memory
agent.sinks.thriftSink.hostname = thriftsrchost.example.com
agent.sinks.thriftSink.port = 4564

Failure Handling in RPC Sources

Failure handling in both the Avro and Thrift Sources is a bit tricky. This is because the RPC sources are invoked by a client or sink on the other side of a network link, though it looks like a local method call. In all cases where the RPC sources cannot start due to some permanent error, like being unable to bind to the port, the source will throw an exception when it tries to start. Since Flume’s configuration system will retry every few seconds to restart the component, since it was successfully configured, the source will start up if the condition causing the error no longer exists—for example, if the other process that was bound to the port was killed or released the port.

The trickier part, though, is with respect to the code that actually receives the data and writes the events to the channel. If even one of the channels the source is configured to write to throws a ChannelException due to the channel being full, or if the transaction is too large, the source returns a failure status to the client or sink that called it and expects it to retry. Since RPC sources receives data via threads owned by a thread pool, exceptions would simply cause the thread to die.

In all such cases, the real indication of failure is only in the log files where these exceptions are logged. Sometimes these exceptions may indicate a major problem, like the process running out of resources (as with an OutOfMemoryError). Therefore, it is important to monitor the logs generated by the Flume agent to ensure that things are running smoothly. ChannelExceptions being thrown too often can mean that the channels are too underallocated for the rate of writes, or that the sinks are not clearing the data from the channels fast enough. Increasing the number of sinks can help if too few sinks are reading the data, but if the eventual destination itself cannot handle the load, the capacity needs to be rethought. In all cases, errors may cause duplicates but never actually cause data loss, since events are removed from the channel if and only if the data is actually successfully written out to the next hop.

HTTP Source

Flume comes bundled with an HTTP Source that can receive events via HTTP POST. For application environments where it might not be possible to deploy the Flume SDK and its dependencies, or in cases where the client code prefers to send data over HTTP rather than over Flume’s RPC, the HTTP Source can be used to receive data into Flume. From a client’s point of view, an HTTP Source behaves exactly like a web server that accepts Flume events.

The HTTP Source takes in a few configuration parameters, as shown in Table 3-5. The configuration of the source is extremely simple and allows the user to also configure the handler that is plugged in.

Table 3-5. HTTP Source configuration
Config parameter Default value Description

type

-

The alias for the HTTP Source is http. The FQCN, which is org.apache.flume.source.http.HttpSource (case sensitive), can also be used.

bind

-

The IP address/hostname to bind to. To bind to all interfaces on the machine, use 0.0.0.0.

port

-

The port to bind to.

enableSSL

false

To enable SSL, this parameter should be set to true.

keystore

-

The path to the keystore file to be used.

keystorePassword

-

The password to be used for accessing the keystore.

handler

JSONHandler

The FQCN of the handler class that should be used by the HTTP Source to convert the HTTP request into Flume events. See “Writing Handlers for the HTTP Source*” to learn how to write handlers for the HTTP Source.

handler.*

-

Any parameters that have to be passed to the handler can be passed in through the configuration by using the handler. prefix.

As is to be expected, the bind and port parameters define the interface and the port the source binds to. This is the hostname and port that the client sends data to.

The HTTP Source also supports SSL for secure transport. By default, the source does not use SSL. To enable SSL, set the enableSSL parameter to true. If SSL is enabled, the keystore and keystorePassword parameters are mandatory. The keystore parameter is the full path to the keystore to be used for SSL. keystorePassword is the password that is to be used to access the keystore.

An example of how to configure an HTTP Source that uses a custom handler is shown in Example 3-3.

Example 3-3. HTTP Source configuration example
agent.sources = httpSrc
agent.channels = memChannel

agent.sources.httpSrc.type = http
agent.sources.httpSrc.channels = memChannel

# Bind to all interfaces
agent.sources.httpSrc.bind = 0.0.0.0
agent.sources.httpSrc.port = 4353

# Removing this line will disable SSL
agent.sources.httpSrc.ssl = true
agent.sources.httpSrc.keystore = /tmp/keystore
agent.sources.httpSrc.keystore-password = UsingApacheFlume

agent.sources.httpSrc.handler = usingflume.ch03.HTTPSourceXMLHandler
agent.sources.httpSrc.handler.insertTimestamp = true

# Initializes a memory channel with default configuration
agent.channels.memChannel.type = memory

What parameters the handler requires and how they are used by the handler depends on the specific handler implementation. Please consult the handler’s documentation for details on this.

Writing Handlers for the HTTP Source*

It is easy for the user to develop and plug in a handler to convert the data received into Flume events. This allows the HTTP Source to accept data from clients in any format that can be processed by the handler. The HTTP Source handler is a class that inherits a very simple interface, HTTPSourceHandler:

package org.apache.flume.source.http;
public interface HTTPSourceHandler extends Configurable {
  public List<Event> getEvents(HttpServletRequest request) throws
          HTTPBadRequestException, Exception;
}

The handler interface is extremely simple and has only one method, getEvents, which accepts the HTTPServletRequest sent by the client and returns a list of Flume events. Even though this handler interface is simple, it can essentially do any arbitrary processing to convert the input data from the HTTPServletRequest into Flume events. The amount of processing should be limited, though, or the client sending data to this source might get timed out. The handler is configurable through Flume’s standard configuration mechanism. Since the HTTP Source always uses exactly one transaction per request whatever the handler is, the sender has to be careful to send only as many events as the channels support.

The handler is responsible for making sure that the configuration parameters passed to it are valid. The HTTP Source will instantiate and configure the handler on startup. Since the HTTP Source propagates any Exception thrown by the handler to the configuration system, the handler must verify the parameters and apply the parameters in the configure method. The parameters are passed in to the configure method via a Context instance. Context instances are simply key-value pairs containing various configuration keys and their values. If the configuration passed in is invalid and cannot be applied successfully, the HTTP Source rethrows the exception thrown by the handler to the configuration system, which in turn disables the HTTP Source and removes it from the agent.

While processing incoming data, the HTTP Source handles exceptions thrown by the handler by returning a failure to the client. The HTTP Source expects the handler to throw an HTTPBadRequestException if the incoming data was malformed and cannot be converted into Flume events. This operation must be idempotent, and the handler must throw the same exception for the same input every time. If an HTTPBadRequestException is thrown by the handler, the HTTP Source returns HTTP error code 400, to inform the client that the request was malformed. If the handler throws any other exception, the source returns HTTP error code 500, to inform the client that there was an internal error in the HTTP Source. It is then up to the client to decide how to retry in such a case. If one of the channels that the source is writing to throws a channel exception, the source returns error code 503, to signal that the channel is temporarily at capacity and the client should retry later.

The JAR file containing the handler (or the handler’s .class file) and all its dependencies should be added to the Flume classpath via the plugins.d mechanism discussed in “Deploying Custom Code”.

If no handler is specified in the configuration, the HTTP Source uses a handler that comes bundled with Flume, which can handle JSON formatted events. Each request can contain several events represented as an array, though the channel(s) the source writes to might have a limited transaction capacity. The handler accepts JSON-formatted data in the UTF-8, UTF-16, or UTF-32 charset, and converts it into a list of events with the body serialized in the charset of the original HTTP request. The format that the handler accepts is shown here:

[{
    "headers" : {
		"event1Header1" : "event1Value1",
		"event1header2" : "event1Value2"
	},
	"body" : "This is the body of the first event."

},
{
	"headers" : {
		"event2Header1" : "event2Value1",
		"event2Header2" : "event2Value2"
	},
	"body" : "This is the body of the second event"
}]

HTTPSourceXMLHandler, shown in Example 3-5, is another example of a handler that works with the HTTP Source. This handler converts XML-formatted data into Flume events. The handler is pretty simple and expects the data to be in the XML format shown in Example 3-4. The format expected by this handler is pretty simple. Only data in between <events> and </events> is processed. Each event is expected to be between <event> and </event> tags. The only thing that limits the number of events per request is the transaction capacity of the channel(s) the source writes to. Each event can have one or more sections, enclosed by <headers> and </headers> tags. Each header is denoted by a tag whose name is used as the header name; the value in between the opening header name tag and the closing tag is used as its value. The body is enclosed between <body> and </body> tags.

Example 3-4. Format expected by HTTPSourceXMLHandler
<events>
  <!-- This can contain as many events
  as the channel can support in a transaction -->

  <event>
    <headers>
      <header1>value1</header1>
      <header2>value2</header2>
    </headers>

    <body>This is a test.
        This input should show up in an event.
    </body>
  </event>

  <event>
    <!-- There can be zero or more headers sections.
     They are merged together, so each header name
     must be unique even between sections. -->
    <headers>
      <event2Header1>event2Value1</event2Header1>
    </headers>

    <!-- Each event can have only one body -->

    <body>This is the 2nd event.</body>

    <headers>
      <event2Header2>event2Value2</event2Header2>
    </headers>
  </event>
</events>

The handler parses the XML-formatted events into Flume events and returns them to the HTTP Source, which in turn writes them to the channel(s). While parsing the events, the handler makes sure that each event has at least a header and a body. If not, the handler throws an HTTPBadRequestException to inform the client that the incoming data was malformed. It can be configured to insert a timestamp into the Flume event headers, which the HDFS Sink can use for bucketing of events.

Example 3-5. XML handler for HTTP Source
package usingflume.ch03;

/**
 * A handler for the HTTP Source that accepts XML-formatted data.
 * Each event can contain multiple header nodes,
 * but exactly one body node. If there is
 * more than one body tag, the first one in the event is picked up.
 */
public class HTTPSourceXMLHandler implements HTTPSourceHandler {

  private final String ROOT = "events";
  private final String EVENT_TAG = "event";
  private final String HEADERS_TAG = "headers";
  private final String BODY_TAG = "body";

  private final String CONF_INSERT_TIMESTAMP = "insertTimestamp";
  private final String TIMESTAMP_HEADER = "timestamp";

  private final DocumentBuilderFactory documentBuilderFactory
    = DocumentBuilderFactory.newInstance();

  private final ThreadLocal<DocumentBuilder> docBuilder
    = new ThreadLocal<DocumentBuilder>();

  private boolean insertTimestamp;

  @Override
  public List<Event> getEvents(HttpServletRequest
    httpServletRequest) throws HTTPBadRequestException, Exception {
    if (docBuilder.get() == null) {
      docBuilder.set(documentBuilderFactory.newDocumentBuilder());
    }
    Document doc;
    final List<Event> events;
    try {
      doc = docBuilder.get().parse(
        httpServletRequest.getInputStream());
      Element root = doc.getDocumentElement();
      root.normalize();

      // Verify that the root element is "events"
      Preconditions.checkState(
        ROOT.equalsIgnoreCase(root.getTagName()));

      NodeList nodes = root.getElementsByTagName(EVENT_TAG);
      int eventCount = nodes.getLength();
      events = new ArrayList<Event>(eventCount);
      for (int i = 0; i < eventCount; i++) {
        Element event = (Element) nodes.item(i);
        // Get all headers. If there are multiple header sections,
        // combine them.
        NodeList headerNodes
          = event.getElementsByTagName(HEADERS_TAG);
        Map<String, String> eventHeaders
          = new HashMap<String, String>();
        for (int j = 0; j < headerNodes.getLength(); j++) {
          Node headerNode = headerNodes.item(j);
          NodeList headers = headerNode.getChildNodes();
          for (int k = 0; k < headers.getLength(); k++) {
            Node header = headers.item(k);

            // Read only element nodes
            if (header.getNodeType() != Node.ELEMENT_NODE) {
              continue;
            }
            // Make sure a header is inserted only once,
            // else the event is malformed
            Preconditions.checkState(
              !eventHeaders.containsKey(header.getNodeName()),
              "Header expected only once " + header.getNodeName());
            eventHeaders.put(
              header.getNodeName(), header.getTextContent());
          }
        }
        Node body = event.getElementsByTagName(BODY_TAG).item(0);
        if (insertTimestamp) {
          eventHeaders.put(TIMESTAMP_HEADER, String.valueOf(System
            .currentTimeMillis()));
        }
        events.add(EventBuilder.withBody(
          body.getTextContent().getBytes(
            httpServletRequest.getCharacterEncoding()),
          eventHeaders));
      }
    } catch (SAXException ex) {
      throw new HTTPBadRequestException(
        "Request could not be parsed into valid XML", ex);
    } catch (Exception ex) {
      throw new HTTPBadRequestException(
        "Request is not in expected format. " +
          "Please refer documentation for expected format.", ex);
    }
    return events;
  }

  @Override
  public void configure(Context context) {
    insertTimestamp = context.getBoolean(CONF_INSERT_TIMESTAMP,
      false);
  }
}

An example of XML handler configuration is shown in Example 3-3. This configuration instructs the handler to insert the timestamp of processing into each event. In general, any number of parameters can be passed to HTTP Source handlers in this way.

The HTTPHandler interface is part of the flume-ng-core Maven artifact, which can be added to your application by including Example 3-6 in your application’s pom.xml file’s dependency section.

Example 3-6. Including the flume-ng-core artifact in your application
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.5.0</version>
    </dependency>

Use the plugins.d framework shown in “Deploying Custom Code” to deploy custom HTTP handlers to Flume agents.

Spooling Directory Source

In many scenarios, applications generate data that gets written to files. Often, these files are not simply text, or they may not make sense if each line is converted into a single event, but a group of lines together make an event. An example of this is stack traces. It is often difficult or not possible to modify these applications to use the Flume Client API to send data directly to Flume. In such cases, Flume’s Spooling Directory Source can be used to read data from each of these files.

A Spooling Directory Source watches a directory, from which it reads events. The source expects files in the directory to be immutable, though new files can be added to the directory in real time. Once a file is moved to the directory, it should not be written to. If you’re dealing with log files, a good way of doing this is to configure your logging system to move the file when it is being rolled. Also, the source expects that filenames are never reused. If either of these two happens, the source will throw an exception and quit. The only way to restart the source at this point is to restart the agent itself.

The Spooling Directory Source is a good alternative to using an Exec Source with tail -F, as discussed later in this chapter, since this source guarantees data delivery and is generally more reliable than using tail -F with an Exec Source. The only downside is that the data is not tailed in real time, and is read only once the file is closed and moved to the relevant directory. Once a file is completely consumed by the source and all its events successfully written to the source’s channel(s), the source can either rename the file or delete the file, based on the configuration. When the file is renamed, the source simply adds a suffix to the filename, rather than changing it completely. The suffix is configurable as well.

The Spooling Directory Source uses a tracker persisted to disk to track the location within each file at which events were successfully written out to the channel, so that the source can start reading data from that position if the agent or machine fails and restarts. This allows the source to track which file it is processing at any point in time and resume processing that file from the last processed location when the source restarts. This is one of the reasons the source does not allow filenames to be reused.

Table 3-6 shows the various configuration parameters accepted by the Spooling Directory Source.

Table 3-6. Spooling Directory Source configuration
Parameter Default Description

type

-

The alias for the Spooling Directory Source is spooldir. The FQCN is org.apache.flume.source.SpoolDirectorySource.

spoolDir

-

The directory to “watch” and read files from. Subdirectories of this directory are not scanned.

batchSize

100

The maximum number of events to write to the channel per transaction.

ignorePattern

^$

Files with names matching this regex are ignored and data is not read from them.

deletePolicy

never

When to delete ingested files—must be never or immediate.

fileSuffix

.COMPLETED

The suffix to use for files that have been completely ingested. The “.” is required if this is to be an extension.

fileHeader

false

If set to true, the filename is added to the event headers.

fileHeaderKey

file

The key to use in the headers, if the filename is added to headers.

trackerDir

.flumespool

The directory where the Spooling Directory Source stores the metadata that is used to restart the source from where it left off.

deserializer

line

The alias or FQCN of the Builder class that can be used to build the deserializer that should be used to read custom-formatted data. “Reading Custom Formats Using Deserializers*” explains how to write deserializers.

deserializer.*

-

Any parameters to be passed to the deserializer.

inputCharset

UTF-8

The character set to use when the deserializer calls readChar.

The type of the Spooling Directory Source is spooldir. As mentioned earlier, this source reads all files in a given directory and processes them one by one. The full path to the directory to process should be passed in via the spoolDir parameter. For performance reasons, the source writes events in batches. The maximum size of each batch is defined by the batchSize parameter. The source attempts to read as many events as it can from the file until the specified batch size is reached. If there are fewer events available in the files, it will commit the transaction as soon as all events are read from the files.

Sometimes, there are files that get written to the same directory that may actually not contain data, like metadata files. To avoid ingesting such files, which are known to not contain valid data, an ignore pattern can be specified via the ignorePattern parameter. This parameter takes a regex, and any files with filenames matching this regex are ignored.

As mentioned earlier, once the files are completely ingested, Flume can either rename or delete the files. To delete the files immediately, set the value of the deletePolicy parameter to immediate. If deletePolicy is set to never (the default), the file is renamed once ingested with the suffix specified by the fileSuffix parameter appended to the original name of the file. Any files that use this suffix for completed files are ignored, so be careful to not use a file suffix that could be the suffix of new files that get written to the directory.

When a file is processed and events are generated from the file, it is often beneficial for processing systems to know which file the events came from (for example, showing the filename a stack trace belongs to in a search UI). The full path and the filename can be included by setting the fileHeader parameter to true. The key to use in the headers can be set using the fileHeaderKey parameter (this defaults to file).

The Spooling Directory Source is able to recover from where it left off, so as to avoid duplicates but still consume all data from the file. This is made possible by persisting information to disk about the file it is processing and reading this information when the source starts up. This information is persisted in the tracker directory. The tracker directory is always inside the directory that this source is watching. The default name of the tracker directory is .flumespool. This can be changed using the trackerDir parameter. Note that the directory is created inside the directory that is being read, and the value of the trackerDir parameter is used as a relative path to the directory that the source is watching. Once the name of the tracker directory is set, if the value of this parameter is changed (even after shutting down Flume), the source will no longer be able to track the location of the file it was processing and might end up being processed again from the beginning, causing duplicates. So, once this is set, it should not be changed.

Example 3-7 shows an example of an agent configured to use the Spooling Directory Source to read data from a directory on disk in batches of 250 events each. The source deletes files as soon as they are completely ingested. It also inserts a header with the filename, with the key usingFlumeFiles.

Example 3-7. Spooling Directory Source configuration example
agent.sources = spool
agent.channels = memChannel

agent.sources.spool.type = spooldir
agent.sources.spool.channels = memChannel

agent.sources.spool.spoolDir = /data/flume/spool
agent.sources.spool.batchSize = 250

agent.sources.spool.deletePolicy = immediate
agent.sources.spool.fileHeader = true
agent.sources.spool.fileHeaderKey = usingFlumeFiles
agent.sources.spool.deserializer = 
usingflume.ch03.ProtobufDeserializer$ProtobufDeserializerBuilder

agent.channels.memChannel.type = memory
agent.channels.memChannel.capacity = 10000
agent.channels.memChannel.transactionCapacity = 500

Reading Custom Formats Using Deserializers*

The source deserializes data in the files in the directory using a pluggable deserializer, allowing the source to read and interpret the data from these files into events in different ways. For example, a deserializer that “understands” Avro could read Avro container formatted files and convert each Avro message to a Flume event, or several lines could be read at once, until an entire stack trace is read and then converted to Flume events. Once all data in a file is read, this source can either delete or rename the file with a new extension so that the same file is not processed again.

To use a custom deserializer, set the value of the deserializer parameter to an implementation of EventDeserializer$Builder that can build the EventDeserializer implementation to use. The deserializer can be configured by passing in parameters with the deserializer. prefix. Text-based deserializers can call the readChar method to read a character. The way a character is represented differs by character set. To tell the source what character set to use, set inputCharset to the name of the character set, which by default is UTF-8.

Deserializers implement the EventDeserializer interface, and should also provide a Builder class, which must implement the EventDeserializer$Builder interface. The Builder must have a public, no-argument constructor that the Flume framework can use to instantiate the builder. The Builder’s build method must create and return a fully configured instance of the deserializer.

A Context instance and an instance of ResettableInputStream are passed to this method. The Context instance can be used to configure the deserializer. The deserializer is expected to deserialize events from the input stream. ResettableInputStream is an interface that is meant to read data from the stream, but also gives the ability to roll back to a previous location in the stream. An instance of the ResettableInputStream class guarantees that a reset call will reset the reads from this stream to the position in the stream at which the last mark call happened, regardless of how many bytes were read from the stream using the read or readChar methods after the last mark call. This allows the Spooling Directory Source to reread events if writes to a channel failed and the events could not be written. The deserializer can use this functionality in its own mark and reset methods to ensure it rolls back to the correct location within the stream.

The deserializer implements two other methods that are called by the source to read events from the stream—the readEvent and readEvents methods. The readEvent method must return exactly one event from the stream, while the readEvents method takes an argument that is the maximum number of events it must read from the stream.

Example 3-9 shows a deserializer that deserializes messages serialized as Protocol Buffer (Protobuf) messages based on the format shown in Example 3-8. Each Protobuf message is written to the file after its length is written to the file as a 4-byte integer.

Example 3-8. Protobuf format used by ProtobufDeserializer
option java_package = "usingflume.ch03";
option java_outer_classname = "UsingFlumeEvent";

message Event {
  repeated Header header = 1;
  required bytes body = 2;
}

message Header {
  required string key = 1;
  required string val = 2;
}
Example 3-9. ProtobufDeserializer: a class that deserializes data written as Protobuf messages
package usingflume.ch03;

public class ProtobufDeserializer implements EventDeserializer {
  private final ResettableInputStream stream;
  private boolean isOpen;

  private ProtobufDeserializer(ResettableInputStream stream) {
    // No configuration to do, so ignore the context.
    this.stream = stream;
    isOpen = true;
  }

  @Override
  public Event readEvent() throws IOException {
    throwIfClosed();
    // To not create an array each time or copy arrays multiple times,
    // read the data to an array that backs byte buffers,
    // then wrap that array in a stream and pass it to the Protobuf
    // parseDelimitedFrom method.
    // The format is expected to be:
    // <length of message> - int
    // <protobuf message (written using writeTo (not delimited)>
    // We assume here that the file is well-formed and the length
    // or the
    // message are not partially cut off.
    byte[] sz = new byte[4];
    if (stream.read(sz, 0, 4) != -1) {
      int length = ByteBuffer.wrap(sz).getInt();
      byte[] data = new byte[length];
      stream.read(data, 0, data.length);
      UsingFlumeEvent.Event protoEvent =
        UsingFlumeEvent.Event.parseFrom(new ByteArrayInputStream(data));
      List<UsingFlumeEvent.Header> headerList
        = protoEvent.getHeaderList();
      Map<String, String> headers = new HashMap<String, String>(
        headerList.size());
      for (UsingFlumeEvent.Header hdr : headerList) {
        headers.put(hdr.getKey(), hdr.getKey());
      }
      return EventBuilder.withBody(protoEvent.getBody().toByteArray(), headers);
    }
    return null;
  }

  @Override
  public List<Event> readEvents(int count) throws IOException {
    throwIfClosed();
    List<Event> events = new ArrayList<Event>(count);
    for (int i = 0; i < count; i++) {
      Event e = readEvent();
      if (e == null) {
        break;
      }
      events.add(e);
    }
    return events;
  }

  @Override
  public void mark() throws IOException {
    throwIfClosed();
    stream.mark();
  }

  @Override
  public void reset() throws IOException {
    throwIfClosed();
    stream.reset();
  }

  @Override
  public void close() throws IOException {
    isOpen = false;
    stream.close();
  }

  private void throwIfClosed() {
    Preconditions.checkState(isOpen, "Serializer is closed!");
  }

  public static class ProtobufDeserializerBuilder implements Builder {

    @Override
    public EventDeserializer build(Context context,
      ResettableInputStream resettableInputStream) {
      // The serializer does not need any configuration,
      // so ignore the Context instance. If some configuration has
      // to be passed to the serializer, this Context instance can be used.
      return new ProtobufDeserializer(resettableInputStream);
    }
  }
}

This ProtobufDeserializer class reads Protobuf-serialized events from the file and converts them into Flume events in the readEvent method, returning null when no more events are available to be read. If no events could be read by the readEvents method, an empty list is returned, as is mandated by the EventDeserializer interface.

Since the file is immutable, once we reach a stage where no more events are available in the file, this means all the events have been read out from the file, at which point the source closes the deserializer by calling the close method. If the serializer is maintaining any internal state or has some cleanup to do, this method is expected to do that. In this case, we simply close the stream. The mark and reset methods simply check that the deserializer is open and forward the calls to the stream. This specific implementation of the serializer does not need any configuration, but deserializers can receive configuration via the Context instance passed to the builder, which in turn can be passed through a constructor to the deserializer instance. Example 3-7 showed a Spooling Directory Source configured to use ProtobufDeserializer.

Flume comes bundled with a handful of deserializers. The default deserializer is the LineDeserializer [line-deserializer]. This is an example of a deserializer that accepts configuration. The line deserializer is enabled if no deserializer is set for the Spooling Directory Source or if the value of the deserializer parameter is set to line. The line deserializer reads the file line by line, converting each line to an event, based on a configurable character set (UTF-8 by default). Table 3-7 lists the configuration parameters.

Table 3-7. Line deserializer configuration
Parameter Default Description

outputCharset

UTF-8

The charset to use to convert the characters read from the file to the byte array that is set as the event body.

maxLineLength

2048

The maximum number of characters to return per line. If the line is longer than this, it is truncated.

Another deserializer that comes bundled with Flume is the AvroEventDeserializer. To use this deserializer, set the deserializer parameter to avro. The Avro deserializer can read Avro container files and send data out as Avro-formatted events. There is only one configuration parameter for this deserializer, as described in Table 3-8.

Table 3-8. Avro deserializer configuration
Parameter Default Description

schemaType

flume.avro.schema.hash

This can be set to either flume.avro.schema.hash or flume.avro.schema.literal. Setting this to flume.avro.schema.hash causes the 64-bit Rabin fingerprint of the schema to be inserted in the headers with the key flume.avro.schema.hash. If the value of this parameter is set to flume.avro.schema.literal, the entire JSONified schema is inserted into the header with the flume.avro.schema.literal key.

To interpret the data, it is important to know the schema used. Though this information is contained in the file itself, it is important to keep the schema with each event so that it is possible to read the data from the event. So, this deserializer supports inserting the Avro schema in the headers or simply putting the 64-bit Rabin fingerprint of the schema (as specified in the Avro specification [schema-fp]) in the headers, which can later be used to look up a schema registry that is indexed on the schema fingerprint. (Imagine a situation where there is Avro-formatted data that is going to be in one of several known schemas, which is mostly the case. Using this fingerprint in the headers allows the user to identify which schema should be used to read the message, and thus this can be used by a serializer while writing the data.) To set the fingerprint in the event headers, set the schemaType parameter to flume.avro.schema.hash. The schema fingerprint is written to a header with the key flume.avro.schema.hash.

If the entire JSON-ified schema should be written to every event, set schemaType to flume.avro.schema.literal. In this case, the entire schema is written with the key flume.avro.schema.literal. Writing the schema in every event’s header is pretty inefficient since it increases the event size, especially if there are a limited number of schema types.

To read files from a directory as a binary large object (BLOB) [blob], the blob deserializer can be used. The FQCN of the blob deserializer is org.apache.flume. sink.solr.morphline.BlobDeserializer. This deserializer takes the maximum size of a blob it should accept, and for each file attempts to read data from the file in blobs of that many bytes. This serializer takes only one configuration parameter, maxBlobLength, which is the maximum size, in bytes, of each blob. If a file is larger than this, the file is split up into several blobs, each with a size less than equal to the configured maximum. This deserializer buffers all blobs in a batch in memory, so the batch size and maximum blob size should be configured to ensure that the serializer does not end up using more memory than expected.

Since the Spooling Directory Source is also a part of the flume-ng-core artifact, make sure you add the flume-ng-core artifact to your serializer’s pom.xml file, as shown in Example 3-6. Custom deserializers can be deployed to a Flume agent using the plugins.d framework shown in “Deploying Custom Code”.

Spooling Directory Source Performance

A Spooling Directory Source is I/O-bound. To avoid complicating deserializer implementations, the source was specifically designed to be single-threaded. This means that it is possible that the performance could be improved by using more threads to read the data and use more of the available CPUs. One way of improving performance of files being read is to write the files alternately to different directories and have one Spooling Directory Source process each of the directories (and write to the same channel, if all the data is going to the same destination). This means more threads read data from the disk and more of the CPUs can be utilized for deserialization.

Syslog Sources

Syslog is a well-known format that is used by many applications to write log messages. Flume integrates with syslog and can receive syslog messages both in TCP and UDP. Flume provides two syslog sources: the Syslog UDP Source and the Multiport Syslog Source. The Syslog UDP Source receives syslog messages in UDP, while the Multiport Syslog Source can receive syslog messages on several ports in TCP. Both sources can parse the syslog messages and extract several fields into Flume event headers, which can be used in HDFS Sink bucketing. If the syslog messages do not conform to the Syslog RFCs, RFC-3164 or RFC-5424, the events will contain a header with the key flume.syslog.status with the value Invalid.

The Syslog UDP Source considers an entire UDP datagram to be one syslog event and converts it to a single Flume event, while the Multiport Syslog Source creates a new message each time it encounters a newline ( ) character. These sources create two headers, Facility and Severity, in each Flume event header to indicate the facility and severity of each message. This can be used in bucketing or with the multiplexing channel selector (discussed in Chapter 6).

Table 3-9 lists the configuration parameters that are common to both sources.

Table 3-9. Syslog Source configuration
Parameter Default Description

type

-

The Syslog UDP Source type is syslogudp and the Multiport Syslog Source type is multiport_syslogtcp.

host

-

The IP address/hostname to bind to. To bind to all interfaces on the machine, use 0.0.0.0.

keepFields

false

If set to true, all fields from the syslog message are left in the event body in addition to having them in the event headers.

The Syslog UDP Source can be enabled using the syslogudp alias, while the Multiport Syslog Source can be enabled using the multiport_syslogtcp alias. Both sources require the user to specify the hostname to bind to as the value of the host parameter. To bind to all interfaces on the machine, use 0.0.0.0 as the value of host. If keepFields is set to true, fields from the syslog message that are normally moved to the event headers (or removed altogether), such as Priority, Timestamp, and Hostname, are left in the body of the event as well as copied to the event headers. The Multiport Syslog Source also allows syslog messages to be encoded in different character sets on each port it receives the data on. This allows a single source to essentially receive data from various sources, each encoding the message in a different character set.

As well as the common parameters, the Syslog UDP Source has only one additional parameter (see Table 3-10).

Table 3-10. Syslog UDP Source configuration
Parameter Default Description

port

-

The port to bind to.

The port parameter is used to specify which port the source should bind to.

The Multiport Syslog Source can bind to several ports on the host. In addition to the common parameters, the Multiport Syslog Source defines the parameters listed in Table 3-11.

Table 3-11. Multiport Syslog Source configuration
Parameter Default Description

ports

-

A space-separated list of ports to bind to.

portHeader

-

The port on which the event was received is included in the headers as the value of the key specified by this header. If this parameter is not configured, the port information is not included.

charset.default

UTF-8

The default character set to use.

charset.port.<port>

-

The character set to use for a specific port.

eventSize

2500

The maximum size of a single event, in bytes.

batchSize

100

The number of events to buffer in memory before writing to the channel.

readBufferSize

1024

The buffer size the underlying server should use.

numProcessors

-

The number of processors to use. This allows the source to increase the degree of parallelism.

The Multiport Syslog Source can receive data on multiple ports. The ports should be listed separated by spaces as the value of the ports parameter. Each event received can be annotated with the port number on which the event was received. The value of the configuration parameter portHeader is used as the key for the header in the Flume event, and the value of this header is the port number.

The source also allows syslog messages to be encoded in different character sets on each port it receives data on. To configure the character set per port, use the charset.port. prefix followed by the port number as the configuration parameter, with the value being the character set name. The default character set can be set using the charset.default parameter. The value of this parameter is used when a specific character set has not been set for a port.

As mentioned earlier, the source assumes that each event is delimited by a newline character. Sometimes, it is important to also ensure that each event does not go over some fixed size. This maximum size for each event can be set using the eventSize parameter, which is represented in bytes. If the event’s size is greater than this value, the event is truncated to this length, and a header with the key flume.syslog.status is inserted with the value Incomplete. The Multiport Syslog Source also buffers events in memory to avoid affecting channel performance. The batch size can be specified using the batchSize parameter.

This source uses a framework called Apache MINA to receive messages. The MINA server uses an internal buffer while reading the data from the network. The size of this buffer is configurable via the configuration parameter readBufferSize, specified in bytes. MINA also supports heavy parallelism. To configure the degree of parallelism, the number of processors that can be used can be passed in using the numProcessors parameter. If this is not defined, the value of the number of processors to be used is autodetected. For each processor that can be used, MINA will spawn up to two threads. To reduce thread usage (not often required), reduce the value specified by numProcessors.

Example 3-10 shows an example of a Multiport Syslog Source configured to receive data on all interfaces and three ports, 4353, 4565, and 4553. The source is configured to process the data received on port 4565 as ISO-8859-1 charset encoded, while all other data received on the other two ports is assumed to be encoded as UTF-8. The source also adds a header to each event with the key port, with the port number where the event was received as the value. The source writes events in batches of 1,000 events each, with a maximum event size of 1,092 bytes. The source also configures MINA to use a buffer of 2,048 bytes while receiving data.

Example 3-10. Multiport Syslog Source configuration example
agent.sources = syslog
agent.channels = memChannel

agent.sources.syslog.type = multiport_syslogtcp
agent.sources.syslog.channels = memory

# Bind to all interfaces
agent.sources.syslog.host = 0.0.0.0
agent.sources.syslog.ports = 4353 4565 4553

agent.sources.syslog.charset.default = UTF-8
agent.sources.syslog.charset.port.4565 = ISO-8859-1

agent.sources.syslog.batchSize = 1000
agent.sources.syslog.portHeader = port
agent.sources.syslog.readBufferSize = 2048
agent.sources.syslog.eventSize = 1092

agent.channels.memChannel.type = memory
agent.channels.memChannel.capacity = 10000
agent.channels.memChannel.transactioncapacity = 5000

Syslog Data Loss

Syslog is generally assumed to be a “fire and forget” protocol. The RFCs do not define a way of sending an acknowledgment from the receiver to the sender, nor does it specify a way to retransmit a message after a timeout. Therefore, if the Flume source is unable to write the events to the channel, or if there is some network disruption causing the message to be lost (especially in the UDP case), there is no real way for Flume to inform the sender of failure or for the sender to know that there is an error condition and resend the message. This essentially causes silent data loss, with no possibility of recovering the lost data. This causes Flume’s no-data-loss guarantee to not hold true for syslog, and therefore, it is recommended to only use syslog if there is no other option and the Flume RPC client or the embedded agent cannot be used at all.

Exec Source

An Exec Source executes a command configured by the user and generates events based on the standard output of the command. It can also read the error stream from the command, convert the events into Flume events, and write them to the channel. The source expects the command to continuously produce data and ingests its output and error stream. The source runs as long as the process started by the command runs, continuously reading the output stream of the process (and the error stream, if configured).

Each line in the output stream is then encoded as a byte array. The encoding to be used is configurable, and defaults to UTF-8. Each byte array generated as a result is then used as the body of a Flume event. For performance reasons, the source then batches a preconfigured number of events (or until a timeout) and writes them out to the channel. If the channel is full, the source can be configured to stop reading the output and error streams of the process (thus blocking the process temporarily) or to drop the current batch and continue reading the output and error streams (thus letting the process continue). Table 3-12 illustrates the configuration parameters for an Exec Source.

Table 3-12. Exec Source configuration
Config parameter Default value Description

type

-

The alias for an Exec Source is exec. The FQCN is org.apache.flume.source.ExecSource (case sensitive).

command

-

The command that the source should run.

restart

false

If set to true, the source will attempt to restart the process if it dies.

restartThrottle

10000

The time in milliseconds to wait before the process is restarted. This parameter has no effect if restart is false or is not set.

logStdErr

false

If set to true, the error stream of the process is also read by the source and converted into Flume events.

batchSize

20

The maximum number of events to buffer before writing out to the channel(s) configured.

batchTimeout

3000

The time in milliseconds to wait before writing buffered events to the channel(s) configured.

charset

UTF-8

The character set to use to encode the output and error streams into Flume events.

shell

-

The shell or command processor to be used to run the command.

The command that is to be run is passed in through the command parameter. The Exec Source can be configured to restart the process started by the command, by setting the restart parameter to true. To make sure there is a sufficient time difference between executions of the command, restartThrottle can be set. Once the process dies, the source will wait for this time interval before running the command again.

Exec Source buffers data to ensure good performance when used in conjunction with the File Channel. As mentioned in “File Channel”, the performance of the channel is better when the number of events per transaction is reasonably high. The batchSize parameter in the Exec Source controls the size of a batch. The source can also be configured to write out the buffered events at the end of a configured time interval, which can be set using the batchTimeout parameter. If both batchSize and batchTimeout are set, the batch is written to the channel as soon as the batch size or batch timeout is met.

The Exec Source can be set to run the configured command in a separate shell, which may be different from the one the Flume process is running in. To run the process in a different shell, pass the full path of the shell executable to the shell parameter. If the command requires shell features like substitution of wildcards or pipes, the shell parameter must be set, since Flume will not perform substitution.

Here is an example of a configuration file where the Exec Source is used for running a complex command using the shell parameter:

agent.sources = execSrc
agent.channels = memChannel

agent.sources.execSrc.type = exec
agent.sources.execSrc.shell = /bin/bash -c
agent.sources.execSrc.command = tail -F /var/log/flume/flume.log | grep "error:"

agent.sources.execSrc.channels = memChannel

# Initializes a memory channel with default configuration
agent.channels.memChannel.type = memory

Possibility of Data Loss with Exec Source

The Exec Source is an example of an asynchronous source, which cannot inform the data producer if there is a failure. As a result, restarting the agent or the machine can result in data loss, as explained here.

The Exec Source is most commonly used to tail files from within Flume. Tailing a file using the Exec Source with the command tail -F will get the data into Flume in near real time, but there is risk of data loss. If the Flume agent dies or the machine restarts, the Exec Source will run the command when it starts up; in this case it will run tail -F <file_name>. Since the tail command will only pull in new data written to the file, any data written to the file between the agent’s death and the time the source started up is lost. For this reason, it is recommended to use the Spooling Directory Source discussed earlier in this chapter to handle data written into files. Though slightly more restrictive, this source will not lose data as it tracks the data being read from the file.

Even when used with some other command, the Exec Source does buffer as many events as the batch size before writing the events to the channel. These events will also be lost if the agent or machine restarts before the batch timeout or batch size is reached.

JMS Source

Flume comes bundled with a source that can fetch data from a Java Message Service queue or topic. Using the JMS Source, it is possible to accept messages from messaging systems that support JMS, like ActiveMQ, as well. Installing and using the JMS Source is slightly trickier than most other sources. Since it is possible to integrate with multiple messaging systems that support JMS, the client libraries supplied by the respective system must be installed into the plugins.d directory, as discussed in “Deploying Custom Code”.

The configuration parameters for the JMS Source are shown in Table 3-13.

Table 3-13. JMS Source configuration
Config parameter Default value Description

type

-

The alias for the JMS Source is jms. The FQCN is org.apache.flume.source.jms.JMSSource (case sensitive).

initialContextFactory

-

The class name of the vendor’s initial context factory. An example, in the case of ActiveMQ, would be org.apache.activemq.jndi.ActiveMQInitialContextFactory.

destinationName

-

The name of the JMS destination from where the messages are consumed.

destinationType

-

The type of the JMS destination, queue, or topic.

providerURL

-

The URL of the JMS broker.

connectionFactory

ConnectionFactory

The JNDI name the connection factory appears as.

messageSelector

-

The FQCN of the message selector class to filter the messages, if needed.

userName

-

The username to log in to the JMS provider.

passwordFile

-

The file that contains the password to the JMS provider.

batchSize

100

The number of events to write to the channel per transaction.

converter.type

DEFAULT

The FQCN of a class that implements JMSMessageConverter. See “Converting JMS Messages into Flume Events*”.

converter.charset

UTF-8

The charset that the default converter should use. This parameter may not be accepted by other converters.

converter.*

-

Other configuration parameters to be passed to the converter.

Vendor-supplied libraries to communicate with the JMS broker must be dropped into Flume’s plugins.d directory. The library’s initial context factory and connection factory classes should be passed in as values to the configuration parameters initialContextFactory and connectionFactory. This information should be available in the documentation of the vendor-supplied libraries. The value of connectionFactory defaults to ConnectionFactory, which should work for popular JMS systems like ActiveMQ. Other JMS-related parameters, such as the destination name, destination type, and message selector (if messages need to be filtered), should also be passed in through the configuration file. The source also requires the URL of the JMS broker as the value of the providerURL parameter.

The source can log in to secure JMS brokers using the username specified by the userName and the password specified in a file whose path is specified by the passwordFile parameter.

By default, this source will pull up to 100 messages from the broker per JMS session, but this can be adjusted using the batchSize parameter.

The JMS Source can convert data from JMS-style messages into Flume events via a converter. The default converter, which is used if no converter is specified in the configuration, can handle most standard JMS messages. If custom or proprietary formats are used, a custom converter can be deployed using the converter.type parameter, whose value should be the FQCN of the converter class. Any parameters that have to be passed to a converter can be passed in using the converter. prefix. For example, to use a character set other than the default UTF-8 character set with the default converter, set the value of converter.charset to the standard name of the character set, like ISO-8859-1. “Converting JMS Messages into Flume Events*” discusses in detail how to write custom converters. Example 3-11 presents a sample JMS Source configuration.

Example 3-11. JMS Source configuration example
agent.sources = jmsSrc
agent.channels = memChannel

agent.sources.jmsSrc.type = jms
agent.sources.jmsSrc.channels = memory

# Bind to all interfaces
agent.sources.jmsSrc.initialContextFactory = 
org.apache.activemq.jndi.ActiveMQInitialContextFactory
agent.sources.jmsSrc.destinationName = UsingFlume

agent.sources.jmsSrc.charset.providerURL = tcp://usingflume.oreilly.com:61616
agent.sources.jmsSrc.destinationType = QUEUE

agent.sources.jmsSrc.batchSize = 1000
agent.sources.jmsSrc.converter.type = usingflume.ch03.JsonMessageConverter
agent.sources.jmsSrc.converter.charset = iso-8859-1

agent.channels.memChannel.type = memory
agent.channels.memChannel.capacity = 10000
agent.channels.memChannel.transactioncapacity = 5000

This configuration file configures the source to pull data from an ActiveMQ message queue, which reads from a JMS queue at host usingflume.oreilly.com:61616. The queue information is passed in via various parameters. The initial context factory is set as org.apache.activemq.jndi.ActiveMQInitialContextFactory, as per ActiveMQ documentation. The JsonMessageConverter is used with the ISO-8859-1 character set.

Converting JMS Messages into Flume Events*

The JMS Source can be configured to use custom code to convert JMS messages into Flume events, much like the HTTP Source’s handler. This makes the JMS Source extremely flexible, and allows the user to parse data in proprietary formats in the JMS message. Flume comes packaged with a JMS Message Converter that supports default JMS formats. The JMS Source can pass configuration into the message converter, just like the HTTP Source. This configuration can be used to set up any initial configuration or state required for the converter. All of the JMS transaction handling is done by the source; the converter need not worry about any of that.

The default JMS Message Converter that comes packaged with Flume converts individual JMS messages into Flume events based on their format and content. A JMS ByteMessage is simply read, and the bytes read from the message are placed into the body of a Flume event. For a JMS TextMessage, the converter encodes the text into a byte array and places it into the body of a Flume event. The encoding to be used is configurable through the source configuration. If an ObjectMessage is read off the JMS queue, the converter wraps the object in an ObjectOutputStream and writes it out to a ByteArrayOutputStream. The bytes are then read from this stream and set as the Flume event’s body. In most cases, this is what users need.

Sometimes, though, it might make more sense to parse out any schemas in the message and convert them into a format that can be serialized at the terminal sink more easily. For example, if there are several different applications writing to the JMS queue in different serialization formats like JSON or XML, then a converter can be written to convert these into a unified format that can be parsed more easily at the terminal sink.

Converters must implement the JMSMessageConverter interface, which is shown in Example 3-12. The JMS Source instantiates a JMSMessageConverter$Builder class, and then passes the configuration via a Context instance to the Builder’s build method, which is expected to return a fully configured converter instance.

Example 3-12. JMS converter interface
package org.apache.flume.source.jms;
public interface JMSMessageConverter {
  public List<Event> convert(Message message)
	   throws JMSException;
  /**
   * Implementors of JMSMessageConverter must either provide
   * a suitable builder or implement the Configurable interface.
   */
  public interface Builder {
    public JMSMessageConverter build(Context context);
  }
}

Example 3-13 shows an example of a JMS Message Converter that reads the JMS messages formatted as JSON and converts them into Flume events. Converters can implement Configurable to accept configuration, and the JMS Source passes any parameters specified with the converter. prefix to the converter. This converter accepts a configuration that tells the converter which charset to use to convert the JSON events to Flume events.

Example 3-13. An example of a JMS Message Converter
package usingflume.ch03;

public class JsonMessageConverter implements JMSMessageConverter,
  Configurable {

  private static final Logger LOGGER =
    LoggerFactory.getLogger(JsonMessageConverter.class);
  private final Type listType
    = new TypeToken<List<JSONEvent>>() {
  }.getType();
  private final Gson gson
    = new GsonBuilder().disableHtmlEscaping().create();
  private String charset = "UTF-8";

  @Override
  public List<Event> convert(javax.jms.Message message)
    throws JMSException {

    Preconditions.checkState(message instanceof TextMessage,
      "Expected a text message, but the message received " +
        "was not Text");
    List<JSONEvent> events =
      gson.fromJson(((TextMessage) message).getText(), listType);
    return convertToNormalEvents(events);
  }

  private List<Event> convertToNormalEvents(List<JSONEvent> events) {
    List<Event> newEvents = new ArrayList<Event>(events.size());
    for(JSONEvent e : events) {
      e.setCharset(charset);
      newEvents.add(EventBuilder.withBody(e.getBody(),
        e.getHeaders()));
    }
    return newEvents;
  }

  @Override
  public void configure(Context context) {
    try {
    charset = context.getString("charset", "UTF-8");
    } catch (Exception ex) {
      LOGGER.warn("Charset not found. Using UTF-8 instead", ex);
      charset = "UTF-8";
    }

  }
}

The converter shown in Example 3-13 simply takes a message that it expects to be a JMS TextMessage. This message is expected to be in JSON format converted into a list of Flume JSON events. Once the JSON events are created, a list of “simple” Flume events are created to avoid the additional overhead of converting the string to a byte array each time it is read. To configure the JMS Source with this converter, Example 3-11 can be used.

To deploy a custom converter, make sure the JAR file containing the class is deployed in the plugins.d directory and the converter.type parameter specifies the FQCN of the class that is to be used. While writing your deserializer, include the flume-jms-source artifact in your application’s pom.xml file:

    <dependency>
      <groupId>org.apache.flume.flume-ng-sinks</groupId>
      <artifactId>flume-hdfs-sink</artifactId>
      <version>1.5.0</version>
    </dependency>

Custom converters can be installed to the plugins.d directory as explained in “Deploying Custom Code”.

Writing Your Own Sources*

Sources are the points of entry for data into a Flume agent. It is likely that users will have custom or proprietary communication formats that need to be used to write data into Flume. This is often more efficient and easier than pushing data via the Flume SDK. To integrate with other systems, users can write their own Flume sources and deploy them, using Flume’s plugins.d mechanism.

Each source “generates” events and then forwards the events to the channel processor for the source. Each time an event is generated by the source, the source can either write it to the channel processor by calling the channel processor’s processEvent method or wait for a batch of events to be generated and then send them to the channel processor using the channel processor’s processEventBatch method. It is almost always better to use the processEventBatch method with a list of events. Each processEventBatch call starts a transaction with each of the channels and writes the entire batch in one transaction, and then commits it. processEvent, on the other hand, creates transactions of just one event, which can cause a severe overhead that affects the channels’ performance. This is why it is recommended that sources use processEventBatch unless each event is known to be large (on the order of hundreds of kilobytes to a few megabytes). To get access to the channel processor for a source, the source can call the getChannelProcessor method defined in the AbstractSource class.

If data coming from an external source requires that an acknowledgment be sent, it is important that this be sent only after the processEventBatch method returns. If a commit to one of the channels fails, we must inform the original data source that the data needs to be sent again. Sending the ACK after the processEventBatch method returns avoids the problem. ChannelExceptions thrown by this method can be caught and a failure can be reported to the data source so that the data can be sent again.

The channel processor for each source is created and set up by the Flume framework, so the source does not need to handle the creation or configuration of the channel processor. In this section, we will take a look at how to write custom sources.

All classes described as dependencies in this section are part of the flume-ng-core artifact or its dependencies. Example 3-6 describes how to include this artifact in your plug-in’s pom.xml file. Custom sources can be deployed to Flume just like any other plug-in, as shown in “Deploying Custom Code”.

Event-Driven and Pollable Sources

Each source is run in its own thread, called a SourceRunner. The source runner runs a single thread that operates on the source. Flume has two types of sources: event-driven sources and pollable sources. Based on the type of source, the Flume framework creates an EventDrivenSourceRunner or a PollableSourceRunner to run the source.

Developing pollable sources

Pollable sources do not run their own threads; instead, they are controlled by the Flume framework, which calls the source’s process method in a loop. These sources extend the AbstractPollableSource class and implement the process method. A pollable source can accept a configuration from the user via the Flume configuration system by implementing the Configurable interface in addition to extending the AbstractPollableSource class.

Pollable sources are sources that run a loop to generate data or poll an external system to receive data, rather than running a server. Once the configuration provider instantiates and configures a pollable source, the Flume framework creates a PollableSourceRunner to run the source.

The Flume framework runs a thread for each pollable source that repeatedly calls the process method. Each time the process method is called, the source “generates” events and passes them to the channel processor. The source is responsible for informing the framework whether it was successfully able to generate data or not. If the source was successfully able to generate events, then the source returns PollableSource.Status.READY to the runner thread that called it, which will call the process method again immediately. Otherwise, the source returns PollableSource.Status.BACKOFF. In such a case, the Flume framework will initiate a backoff with the process method being called only after a brief timeout, which increases by one second each time the source returns failure (the maximum timeout is five seconds). A pollable source is expected to generate data on its own, or by polling some other source.

If the processEventBatch method throws an exception, the source can catch the exception to report failure to the system from which data is being retrieved. In the case of a JMS Source, this might result in a JMS transaction being rolled back. Otherwise, success can be reported to the external system, like a JMS transaction commit.

A pollable source is extremely simple to write. An example of a pollable source is shown in Example 3-14. The StockTickerSource periodically polls an external service for the prices of a preconfigured set of stock tickers and creates Flume events from the data received. This source simply converts the stock prices into simple strings and creates the event body from the string’s UTF-8 representation.

Example 3-14. An example of a pollable source
package usingflume.ch03;

public class StockTickerSource extends AbstractPollableSource {

  private static final String CONF_TICKERS = "tickers";
  private static final String CONF_REFRESH_INTERVAL = "refreshInterval";
  private static final int DEFAULT_REFRESH_INTERVAL = 10;

  private int refreshInterval = DEFAULT_REFRESH_INTERVAL;

  private final List<String> tickers = new ArrayList<String>();
  private final QuoteProvider server = new RandomQuoteProvider();

  private volatile long lastPoll = 0;

  @Override
  protected Status doProcess() throws EventDeliveryException {
    Status status = Status.BACKOFF;
    if(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastPoll) >
      refreshInterval) {
      final List<Event> events = new ArrayList<Event>(tickers.size());
      Map<String, Float> prices = server.getQuote(tickers);
      lastPoll = System.currentTimeMillis();
      // Convert each price into ticker = price form in UTF-8 as event body
      for(Map.Entry<String, Float> e: prices.entrySet()) {
        StringBuilder builder = new StringBuilder(e.getKey());
        builder.append(" = ").append(e.getValue());
        events.add(EventBuilder.withBody(builder.toString().getBytes(Charsets
          .UTF_8)));
      }
      getChannelProcessor().processEventBatch(events);
      status = Status.READY;
    }
    return status;
  }

  @Override
  protected void doConfigure(Context context) throws FlumeException {
    refreshInterval = context.getInteger(CONF_REFRESH_INTERVAL,
      DEFAULT_REFRESH_INTERVAL);
    String tickersString = context.getString(CONF_TICKERS);
    Preconditions.checkArgument(tickersString != null && !tickersString
      .isEmpty(), "A list of tickers must be specified");
    tickers.addAll(Arrays.asList(tickersString.split("\s+")));
  }

  @Override
  protected void doStart() throws FlumeException {
    server.start();
  }

  @Override
  protected void doStop() throws FlumeException {
    server.stop();
  }
}

This source extends the AbstractPollableSource class, whose parent, BasicSourceSemantics, implements the start, stop, and configure methods. Calls to these methods are delegated to the doStart, doStop, and doConfigure methods of this source.

When the agent starts, the framework configures this source. Any configuration must be set up through the doConfigure method that gets called. In this example, the list of tickers for which the quote must be retrieved is fetched from the configuration file. The interval between consecutive event generation is also read from the configuration.

After this, the doStart method of this source gets called. This method can be used to set up any network clients. In this case, the service that fetches the stock quotes is started (since this is just an example, the service in this case simply returns random values).

The pollable source runner then calls the process method (implemented in BasicSourceSemantics), which delegates the calls to the doProcess method. The doProcess method generates the events from the data fetched from the external service. When the source successfully translates the incoming quotes into Flume events and writes events out to the channels via the channel processor’s processEventBatch method, it returns Status.READY; otherwise, it returns Status.BACKOFF. In this case, if consecutive calls to doProcess occur in less time than the refresh interval, the source simply returns Status.BACKOFF, indicating that the source runner should wait for a while before calling the process method again.

Eventually, when the agent is stopped, the doStop method gets called. This is where any cleanup can be done, like closing network connections. Once the agent is stopped, the instance may be garbage-collected.

This is a simple example of a pollable source. For a more realistic example, please take a look at the JMS Source in Flume [jms_src_code].

Building event-driven sources

Event-driven sources implement the EventDrivenSource interface, which is simply a marker interface for the Flume framework to choose the SourceRunner implementation to run the source. Event-driven sources usually run their own threads, which are started when the Flume framework calls the start method. Such sources control the rate at which they write data to the channels.

For example, the HTTP Source that comes bundled with Flume is an event-driven source that runs a web server that listens on a particular port. It generates Flume events based on the HTTP requests sent to it, and writes those events out into the channel(s) associated with it. Event-driven sources typically run their own threads or thread pools, which handle event generation and writing those events to the channels. Since these sources respond to some external stimulus, the Flume framework creates a new EventDrivenSourceRunner that simply starts these sources by calling the start method in a new thread and then allows them to manage themselves. When the agent is stopped or reconfigured, the stop method is called to stop the source.

Event-driven sources respond to an external event to produce data. Most sources that receive data from an external entity would fall into this category. Event-driven sources run their own threads that they use to receive data and generate events. Most of the sources that come bundled with Flume, like the Avro Source, HTTP Source, Exec Source, etc., are event driven.

An event-driven source is slightly more complex than a pollable source because this source has to keep track of an external process that produces the data and handle the incoming data without help from the Flume framework. Event-driven sources can also be configured via the configuration file, by implementing the Configurable interface.

To write an event-driven source, extend the AbstractEventDrivenSource class, which already implements the Configurable interface. Alternatively, simply implement the marker interface, EventDrivenSource, which extends the Source interface. As a result, the only methods that the source needs to implement are the ones defined by the Source interface and, if required, the Configurable interface.

The AbstractEventDrivenSource class extends the BasicSourceSemantics class, which implements the start, stop, and configure methods. These calls get delegated to the doStart, doStop, and doConfigure methods of this class.

Once the framework starts the source, it is pretty much on its own, until the framework stops it. It is completely up to the implementor of the source to create threads to handle the external events that generate Flume events.

Example 3-15 shows an example of an event-driven source that accepts data from an external service via Netty-Avro IPC. We will not go into a detailed discussion of the IPC protocol here, but the basic idea is that a handler class, in this case the TransactionSource class, must implement an interface, FlumeCreditCardAuth, which is generated by Avro from the protocol definition file and is shown here:

@namespace("usingflume.ch03")

protocol FlumeCreditCardAuth {

  record CreditCardTransaction {
    string cardNumber;
    string location;
    float amount;
  }

  enum Status {
    OK,
    FAILED
  }

  Status transactionsCompleted(array<CreditCardTransaction> transactions);
}

The Avro compiler generates classes representing the CreditCardTransaction and an interface that represents the callback whose transactionCompleted method gets called when the client sends data to this host. The TransactionSource implements this interface. More information about the Avro IDL can be found in the Avro documentation [avro-idl].

Example 3-15. An example of an event-driven source
package usingflume.ch03;

public class TransactionSource extends AbstractEventDrivenSource implements
  FlumeCreditCardAuth {

  private static final String CONF_HOST = "host";
  private static final String CONF_PORT = "port";
  private static final String DELIMITER = ";";

  private String host;
  private Integer port;
  private Server srv;

  @Override
  protected void doConfigure(Context context) throws FlumeException {
    host = context.getString(CONF_HOST);
    Preconditions.checkArgument(host != null && !host.isEmpty(),
      "Host must be specified");
    port = Preconditions.checkNotNull(context.getInteger(CONF_PORT),
      "Port must be specified");
  }

  @Override
  protected void doStart() throws FlumeException {
    srv = new NettyServer(
      new SpecificResponder(FlumeCreditCardAuth.class, this),
      new InetSocketAddress(host, port));
    srv.start();
    super.start();
  }

  @Override
  protected void doStop() throws FlumeException {
    srv.close();
    try {
      srv.join();
    } catch (InterruptedException e) {
      throw new FlumeException("Interrupted while waiting for Netty Server to" +
        " shut down.", e);
    }
  }

  @Override
  public Status transactionsCompleted(List<CreditCardTransaction> transactions)
    throws AvroRemoteException {
    Status status;
    List<Event> events = new ArrayList<Event>(transactions.size());
    for (CreditCardTransaction txn : transactions) {
      StringBuilder builder = new StringBuilder();
      builder.append(txn.getCardNumber()).append(DELIMITER)
        .append(txn.getLocation()).append(DELIMITER)
        .append(txn.getAmount()).append(DELIMITER);
      events.add(EventBuilder.withBody(builder.toString().getBytes(
        Charsets.UTF_8)));
    }
    try {
      getChannelProcessor().processEventBatch(events);
      status = Status.OK;
    } catch (Exception e) {
      status = Status.FAILED;
    }
    return status;
  }
}

The source takes a few configuration parameters, like the hostname and port to which the server must bind. The configure method is called by the Flume framework once the source is initialized and all configuration specific to this source is passed in via the Context instance. All relevant configuration must be validated and saved to relevant fields in this method. For example, the doConfigure method of the TransactionSource verifies that the hostname and port are supplied by the user in the configuration file, throwing an exception if they are not specified.

Once configured, the framework starts the source by calling start, which gets delegated to the doStart method of this class. In this method, the source starts the server process. Once started, the framework does not interact with the source until it is to be stopped. Any servers or threads that are to be started to receive data must be started in this method. In the case of the TransactionSource, a NettyServer is started in this method.

The NettyServer manages several threads that receive data over the network. When a complete Avro message is received, it calls the transactionCompleted method, to which it passes the data. This method converts the data from the Avro format to Flume events. Being demo code, the translation is simply into a byte representation of a string-formatted version of the data. This conversion can involve any arbitrary logic, though it is a good idea to keep this logic simple as this code will execute for every event received and will have a direct impact on the performance of the source.

Once the incoming data is converted to Flume events, the entire batch of Flume events are passed to the channel processor via the processEventBatch method. The channel processor handles the transaction processing with the channel(s) and throws an exception if commits to any channel fail, in which case the source returns Status.FAILED to the sender, who may send the same data again to ensure it is persisted to the Flume channels. If the events are successfully written out, the source returns Status.OK, indicating that the data has been committed to the channel(s).

When the agent stops or is being reconfigured, the stop method gets called. This call gets delegated to the doStop method, which must do any cleanup required. In this case, this method stops the NettyServer instance, and waits for it to terminate before returning. If event-driven sources start threads or create thread pools, or open sockets or files, they must be shut down or closed, respectively, in the stop method.

Summary

In this chapter, we discussed the two types of Flume sources (pollable and event driven). We also looked at the various sources that come bundled with Flume, their configuration, their plug-ins, etc. We also looked at some of the best practices with respect to the sources, mostly with reference to batching. In the end, we wrote custom event-driven and pollable sources to get a better understanding of how sources are written.

In the next chapter, we will cover the basics of channels, the two channels that come bundled with Flume, and how to configure them.

References

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.14.80.92