At various points in the book, we have referred to Fluentd’s support for the development of plugins beyond those from the core product. The extensibility of Fluentd has led to a robust ecosystem of third-party plugins to make it easy to capture, filter, manipulate, and send them to many different systems and data stores. We have also discussed how custom plugins could connect to and monitor esoteric and legacy solutions when things cannot easily or efficiently be achieved with the existing plugins.
In this chapter, we will walk through the process of creating an input and output plugin that makes use of Redis’s list capability. We will take a closer look at Redis and the rationale behind its use.
Code for the plugin developed here is included with the download of the book or retrievable from our GitHub repository (http://mng.bz/M20W). If you want to build upon what is provided, we encourage you to fork the GitHub repository and develop it as you wish; enhancement opportunities you might like to consider include
Moving to use the RedisTimeSeries feature (more on this in a moment)
Increasing security on the connection to Redis (e.g., using SSL/TLS connections and credentials, such as username and password)
Whatever approach you take, all we ask is for you to acknowledge this book as the starting point in the code.
Redis is an open source, scalable, in-memory storage solution built around name-value pairs. The ability to define details such as time to live (TTL; this means that the time to hold the data is defined, after which it is automatically deleted) on data elements held, making for an exceptional caching tool. (In appendix E, we have provided plenty of supporting references for Redis in addition to those for Ruby.)
Aside from using Redis to help demonstrate plugin development, there are some potential real-world benefits of developing such a plugin:
Rather than using a small, embedded, in-memory cache, we can use a highly resilient open source option that can be far more effective in scaling and replicating the cached data across multiple servers (for more insight into the use of caches, check out https://techterms.com/definition/cache).
It provides an additional option for enabling Fluentd nodes to collaborate efficiently.
It creates integrations with Redis lists, such as the Ruby Resque (Ruby queuing implementation; https://github.com/resque), providing opportunities to support or use additional services.
It provides a means to keep events held in order, as Redis lists support the first in, first out (FIFO) pattern; this allows us to keep log events in sequence (i.e., in time series).
Caching solutions such as Redis operate by storing the data within memory data structures rather than long-term storage like a conventional database. The structures handle the data as key-value pairs. The internal structures can be very sophisticated to allow the cache to quickly locate the correct bit of memory. In addition to this, the data can be handled as a time series of events. The data held can also have a TTL before being automatically removed from the store.
At the time of this writing, Redis Labs has developed the high-performance RedisTimeSeries, which handles data in a time-series format made of two 64-bit structures representing the time and a value (more at https://oss.redislabs.com/redistimeseries). Using it presents some challenges:
There is no support for a Windows native option, and we want to keep things as simple as possible without needing to work through the different approaches to Linux virtualization.
The data structure used by RedisTimeSeries cannot hold the log events in 128 bits, which means that to use this structure, the value part must act as a “foreign key” to another data structure. Taking the example solution to an enterprise-class capability may make this worthwhile, but it adds complexity related to Redis, not Fluentd, and therefore won’t be beneficial to this book.
Given these points, we will stick with the vanilla Redis features.
Before we start looking at any development activities, let’s simulate the behavior we want to achieve with the plugins. Simulating the behavior will make it easier to relate the development activities back to the solution. To do this, we first need to install Redis, using the details provided in appendix A. Once Redis is installed, we will use its command-line interface (CLI) to simulate the effect of the plugins we will build.
The Redis server needs to be started in its own shell. This can be done with the command
Once we see evidence of that process running (Redis reporting to the console), the next step is to start the Redis CLI with the command redis-cli
in a new console window. We can recognize when the command has worked, as the command prompt will look like
We can confirm that we have a proper Redis server connection by entering the command info. This will result in a lot of settings information, including the Redis version displayed in the console. With one redis-cli
running, we need to repeat the process in a separate shell to simulate the effect of two different processes interacting with the cache. For the rest of this section, we will refer to these as CLI 1 and CLI 2. As we go through the simulation, imagine CLI 1 is acting as a Fluentd output plugin and CLI 2 as a Fluentd input plugin. In CLI 1, we want to run the following list push (lpush
) commands:
lpush fluentd '{"
tag"
:"
demo"
,"
timestamp"
: 1606076261,"
record"
: {"
blah"
:"
blah 1"
}}' lpush fluentd '{"
tag"
:"
demo"
,"
timestamp"
: 1606076263,"
record"
: {"
blah"
:"
blah 2"
}}' lpush fluentd '{"
tag"
:"
demo"
,"
timestamp"
: 1606076267,"
record"
: {"
blah"
:"
blah 3"
}}'
Each time the command is issued, you will get a response looking like (integer) 1. To start with, we will see this response changing as the list depth changes. We can use the list length (llen
) command
to find out how many entries in the list exist. The reference to fluentd
in these commands is the name of the list we are using. Notice how we need to use single quotes at the beginning and end of the record so that we can use double quotes in the JSON. Now, in CLI 2, issue the list pop (lpop
) command:
As a result, you will see the first entry provided in CLI 1 displayed on the CLI 2 console. Repeat the command lpop fluentd
, and the response will include the second record added. Add more entries on CLI 1 and continue popping on CLI 2 until you see a nil response. This means the list has been emptied.
CLI 1 is effectively our output plugin behavior in our simulation, and CLI 2 is our source plugin behavior. This can help our Fluentd setup by smoothing out the spikes in the activity. Fluentd input and output nodes are transient in nature (as may be the case of a containerized environment). In that case, we have efficient in-memory storage of log events (holding events in memory will not suffer the same I/O performance impacts of storage to disk).
Before we can start any development, some basic preparation is needed. You need to have
Prepared an installation of Fluentd and Ruby, or continue using the existing installation that has been used with the previous chapters (if you would rather start fresh, then all the necessary steps are detailed in appendix A).
Prepared a simple installation of Redis, if you have not already done this (details for this are in appendix A).
Chosen an IDE (integrated development environment) that can support Ruby and installed the IDE and any relevant extensions needed (we will be using Visual Studio Code). As this tends to be a personal choice, we will leave the choice and installation up to you.
Installed the Redis Gem to write Ruby code to talk with Redis (detailed in appendix A).
Established a folder for implementing our plugin; this may align with the folder structure created when you retrieved all the book’s support files.
We do not want to accidentally pollute the use of Ruby and the catalog of gems with our development efforts. One way to achieve this is to allow our development code to be picked up by Fluentd when we start it rather than require it to be packaged as a gem and deployed to test code. There are also ways to comingle development code with the gems file system, which we would not recommend.
For the rest of this chapter, we will assume that path starts as c:myDevGitHub|LoggingInActionChapter9
. Once these components are in place, then we are ready to start.
As you have seen throughout the book, Fluentd works with a strong foundation of plugins of different types—input, output, filter, parser, and formatter. Fluentd needs to impose some common mechanisms, including naming conventions and folder structure, to make plugins work and tell the core of Fluentd about the different plugins installed. To help with this, Fluentd includes some tools to help make sure plugin development complies with the required conventions. Plugins have a class hierarchy with classes to build upon for each plugin type. Figure 9.1 illustrates this class hierarchy.
Fluentd provides a tool for building the skeleton framework for our plugin. Before this is executed, you need to be in the correct location that you want to use for Fluentd development (e.g., the root folder for your own GitHub project), as it will use that folder as the starting point for all the artifacts. Once in the correct location, we can use the command fluent-plugin-generate <plugin type> <plugin name>
. The plugin type represents the type of plugin (input, output, etc.) and the plugin’s name. We are going to start with the output plugin. In our case, this makes the complete call:
We have opted for the name redislist
, as the plugin works specifically with Redis’s lists capability. As with most data stores, there is already a general-purpose Redis plugin in existence (http://mng.bz/aDP7); you might want to build a plugin to monitor its health as well.
As a result of using the utility, we get a directory structure as illustrated in figure 9.2, which includes some skeleton configuration and code files. As we progress through this chapter, we will address each of the files.
The code we will develop for the plugin is shown in the /lib/fluent/plugin
folder, as illustrated in figure 9.2, with the Ruby file based on the plugin name (e.g., out_ redislist.rb
). The code we develop in the following few sections will go into this file.
Plugins are taken through a life cycle, with each stage having a method that can be overridden if you need to implement the logic for that specific stage of a life cycle. As we will see during the development of our input and output plugins, we do not have to use every stage, but some are essential in nearly every case—for example, configure, start, and shut down. Each of these states also has a query function to determine whether that is the current state—for example, after_shutdown?
Figure 9.3 illustrates the full life cycle that a plugin supports along with the goal of each stage.
We’ve generated the skeleton and taken a moment to look at the life cycle stages; now the plugin can inject any specific behavior required. We can now get down to writing the plugin code that processes the configuration and executes the handling of log events, and we’ll cover connecting and disconnecting with Redis.
With the plugin structured, the first step is to define the configuration attributes the it will use. This is achieved by using the config_param
object, which takes the values for
The attribute name as it would be used in the configuration file.
The attribute’s data type, which could be any of string, integer.
Whether the value should be kept secret when Fluentd performs a dry run or the startup output of its configuration. By default, this is false and therefore not needed. But to illustrate the behavior, we have included its use.
The ability to define an alias for the attribute name. Aliasing can be helpful when a plugin changes over time.
We need to capture a port number, host address (a DNS name or IP), and the list’s name held by Redis. To do this, we need to define several configuration attributes to be specified in the configuration file. The code is a sequence of desc
and config_param
statements, with the desc
providing the description of the following config_param
appearing in the code. We know the data types and potential default values mean we should exploit the available attributes to simplify data validation. This will also make the user’s experience as simple as possible. As a result, we should end up with code as illustrated in the following listing.
desc "specifies the port to connect to Redis with, will ➥ default to 6379 if not specified" ❶ config_param :port, :integer, default: 6379, secret: false, alias: ➥:portNo ❷ desc "Defines the host address for Redis, if not defined ➥ 127.0.0.1" config_param :hostaddr, :string, default: "127.0.0.1", ➥ secret: false ❸ desc "Defines the name of the list to be used in Redis, by ➥ default this is Fluentd" config_param :listname, :string, default: "fluentd" desc "Defines the number of reconnection attempts before giving ➥ up on connecting to the Redis server." config_param :reconnect_attempts, :integer, default: 2 desc "Defines the number of seconds before timing out a connection ➥ to the Redis server" config_param :connection_timeout, :integer, default: 5 desc "Defines the number of log events in a chunk" config_param :chunksize, :integer, default: 20
❶ Providing the desc entry before each config_param means that tools can be used to generate the documentation for the plugin configuration, as well as provide documentation internally.
❷ This is an example of setting an integer configuration value and defaulting the value. We can also indicate to Fluentd when it generates the summary of the configuration at startup and outputs it; we can tell Fluentd with the secret parameter whether the value should be included. We have also told Fluentd that if it receives portNo as a configuration value, then use it as the port number source.
❸ As with the previous example, we are setting a default value. This time we are providing a string.
Once defined, we can refer to the values as class-level elements and use the Ruby @
notation (e.g., @port
) within the rest of the code.
We should also hold the Redis connection as a class member, so we do not need to re-create the connection every time we interact with Redis. With the configuration details, we could also try to create the Redis connection now. Still, by the life cycle definitions, that would be incorrect, and if another plugin configuration failed, Fluentd might choose not to start our plugin. As a result, we would be consuming unnecessary resources, such as the network connection. But we need the Redis dependency incorporated into the code with require "redis"
declaration at the top of the file.
Note When it comes to coding style, I create the steps needed in the plugin as small functions. This may appear inefficient, but the optimizers in compilers, interpreters, and language virtual machines can usually optimize out these overheads. This approach does make for easier testing, and each step is easier to examine in isolation.
The framework provides a configure
function, as seen in the life cycle (see figure 9.3). This gives us the chance to implement any additional custom validation needed. We can also use this method to define our own class-level variables. To illustrate this capability, we will implement code to check if the network port defined is the default Redis port. If it isn’t the default port, log a warning message to remind developers that they need to ensure ports are deconflicted. This code is in the following listing, with both our check function and the implementation of the configure function, which uses any inherited behavior.
def check_port(conf) log.trace "checkport invoked" port = conf['port'] ❶ if (port != RedislistOutput::DefaultPort) ❷ log.info ("Default Redis port in use") else log.warn ("Non standard Redis port in use - ensure ports are deconflicted") end end def configure(conf) ❸ super ❹ checkPort (conf ❺ end
❶ Retrieves the configuration value from the list of configuration attributes using the defined name
❷ We defined a constant in the class for the default port.
❸ This is the method that is invoked as part of the plugin life cycle and triggers our check_port method.
❹ Makes sure any inherited logic is executed first
❺ Invokes our method, passing all the configuration data over
After the configuration, the most critical functions in the life cycle will be the start and shutdown functions for most plugins. These are the ideal moments to create or close connections, such as those to Redis. Since establishing connections to storage solutions is often relatively slow, we want to perform the task before actively communicating with the remote solution. Redis allows us to define time-outs on the connections and how many reconnection attempts we can have. We’ll set these up as configuration values, but for now, let’s hardwire them. If the connection fails to be established, we should make it easy to recognize during the plugin life cycle. We can do this by setting the connector to nil
and logging the issue.
We need to incorporate the start and shutdown functions shown in the following listing into our plugin and our supporting functions to ensure that the required behavior is achieved.
def connect_redis() log.trace "connect_redis - Create connection if non existant" if !@redis begin @redis=Redis.new(host:@hostaddr,port:@port,connect_timeout:@connection_ ➥ timeout,reconnect_attempts:@reconnect_attempts) ❶ log.debug "Connected to Redis "[email protected]?.to_s rescue Redis::BaseConnectionError, Redis::CannotConnectError => conn_err ❷ log.error "Connection error - ", conn_err.message, " connection ➥ timeout=", @connection_timeout, " connection attempts=", ➥ @reconnect_attempts @redis = nil return nil rescue => err log.error "Error connecting to redis - ", err.message, ➥ "|",err.class.to_s ➥ @redis = nil return nil end end end def start super log.trace "starting redis plugin " connect_redis() ❸ end def shutdown super ❹ log.trace "shutdown" if @redis ❺ begin @redis.disconnect! log.debug "disconnecting from redis " @redis = nil rescue log.error "Error closing Redis connection" end end end
❶ Establishing the Redis connection, informing the connection parameters from values retrieved from the configuration properties and defined constants
❷ Handles Redis connection errors separately from the general catchall, as we can guide the user more effectively for these kinds of issues
❸ Establishes a connection to Redis as part of the startup once all the inherited activities are completed
❹ Ensures any inherited tasks are completed
❺ If we still have a Redis connection, then start the disconnection process.
With enough code to prove we can get our plugin to at least start and stop, we can run a simple test with the full Fluentd. The Fluentd tooling includes extensions to help with unit testing, but it is always rewarding to see code firing up as part of something bigger, particularly when the startup is quick. To do this, we need to ensure Fluentd can pick up our plugin code.
We need a test configuration to be able to run Fluentd. We have prepared one for this job, and it can be retrieved from Chapter9/Fluentd/dummy-plugin.conf
. Of course, you might like to choose to develop your own configuration, given everything you’ve learned in the book.
We have repeated using the dummy source Fluentd plugin to generate log events, as the content is not essential. The crucial element is the match configuration, as illustrated in the following listing.
❶ This is the declaration to use our plugin.
❷ As we build upon the base class provided, the in-memory helper plugin is available if the use of the buffer attribute is defined. For now, we have the buffer configuration commented out.
Before starting Fluentd, we need to start the Redis server as we did earlier in the chapter. Eventually, we will want to package and deploy our plugin just like any other Fluentd plugin using the gem tools. But to start with, we do not want to go through the additional effort of deploying and undeploying a gem every time we make a change. To avoid the gem deployment issue, we can add parameters to the command line telling Fluentd to pick up the source code of our plugin from a Ruby file. For example, my copy of the plugin directory structure starts at
Then the extended Fluentd command will look as follows:
fluentd -c Chapter9Fluentddummy-plugin-out.conf -p ➥ c:myDevGitHub|UnifiedLoggingWithFluentdChapter9fluent-plugin-out- ➥ redislistlibfluentplugin -vv
Going forward, we will show the path as <plugin absolute path>Chapter9 fluent-plugin-out-redislistlibfluentplugin
, where you need to substitute <plugin absolute path>
accordingly.
Combining the configuration file and extending the path to collect our plugin mean our command to start Fluentd will result in Fluentd displaying the configuration as it starts up. With the Redis server running at the command line, it will be possible to see Redis logging the number of connections it has as we start and stop Fluentd.
Your objective is to restart the Redis server on a different port and create an alternative Fluentd configuration to connect to Redis on a different port. You need to confirm that our configuration check is performing correctly. To restart Redis on a different port, add --port nnnn
, where nnnn
represents the port number to use with the startup command.
The modified configuration file solution can be found at Chapter9ExerciseResultsdummy-plugin-Answer.conf
, where we have changed the port from 6379
to 16379
. We also modified the log message generated by the dummy output plugin, although we will not see that yet.
When the Redis server is started, we need to add --port 16379
to override the default. Our Fluentd startup command now becomes
fluentd -c Chapter9ExerciseResultsFluentddummy-plugin-Answer.conf -p ➥ <plugin absolute path>Chapter9 ➥ fluent-plugin-out-redislistlibfluentplugin
When Fluentd starts up, we should see the warning in the log output from Fluentd about using a nonstandard port. But the Redis server should report the connection.
Having proven we can configure, start up, and shut down the plugin, we can move to the next step of implementing the logic of sending the events to Redis. There are several ways the logic can be executed:
Synchronous—Process each event by implementing the method def process (tag, es)
. This is the most straightforward approach and the least performant for execution, as it does not use any buffering.
Synchronous buffered—Output is implemented by the method def write (chunk)
.
Asynchronous buffered—Output is implemented by the method def try_write (chunk)
.
Which implementation method is used is dictated by whether the configuration includes a <buffer>
section or not, unless we configure some override to the standard behavior. For the first implementation, we will keep it nice and straightforward with the synchronous model.
Our implementation process needs to tag the event stream passed as (es)
and iterate through the events. As the stream could contain multiple events, we can make the process a little more efficient by telling Redis to batch up executing the insertion of the events. This is done by telling Redis it will receive multiple transaction calls using the command redis.multi
. Once we have iterated through the events, tell Redis it can execute the transactions using the call redis.exec
. As we iterate through the log events, we need to perform the following actions:
Build a JSON representation of the log event(s). If you have reviewed the output interface, you will note that there is a predefined formatter function. We have chosen not to override or use this, as we do not want to impact other applications of this method within the plugin’s base classes; we can therefore format the presentation in any desired manner—for example, using msgpack.
We should be defensive in our code to handle the scenario of losing the Redis connection before all the events have been committed to Redis.
We need to transform the log event to JSON for potentially three different functions; we should write the logic once and invoke it from the different plugin methods involved. The result of this is two methods, as shown in the following listing.
def redisFormat(tag,time,record) ❶ redis_entry = Hash.new redis_entry.store(RedislistOutput::TagAttributeLabel, ➥ tag.to_s) ❷ redis_entry.store(RedislistOutput::TimeAttributeLabel, ➥ time.to_i) redis_entry.store(RedislistOutput::RecordAttributeLabel, ➥ record.to_s) redis_out = JSON.generate(redis_entry) ❸ return redis_out end def write(chunk) ❹ log.trace "write:", chunk @redis.multi ❺ chunk.each do |time, record| log.debug "write sync redis push ", chunk.metadata.tag, ➥ time, record, @listname @redis.lpush(@listname,redisFormat ➥ (chunk.metadata.tag, time, record)) end @redis.exec ❻ end
❶ This is our function that translates the log event into a JSON representation. We need to build our own JSON representation, as we need to capture all the log event attributes.
❷ By using predefined constants, these could be shared with the input plugin.
❸ As the record, event time, and tag represent a flat structure, we can build a simple hash structure and then exploit the prebuilt operations to convert it to JSON. Note we don’t use the formatter method, as this is used by other parts of Fluentd, such as the buffer, and we don’t wish to confuse that logic with a variant representation.
❹ This is one of the standard functions used by an output plugin.
❺ This tells Redis to accept multiple statements that should be executed in a single operation.
❻ This releases the Redis library to send all the statements to the Redis server as a single block.
We have reached a state where we can confirm that we can write to Redis. Using the previously illustrated approach, restart Fluentd and use the illustrated Redis commands to review the list in Redis and pop entries in the list.
In repeating the Fluentd test, with the write method now in place, you should expect the Redis commands to show the list structure to grow with JSON content, looking something like this:
The log events as they are generated will increment the counter value due to the source configuration. This will mean there should be a correlation between the counter attribute values and the Redis list length. This can be confirmed with the llen
command in the Redis CLI. You will also be able to pop the entries from the list using the command lpop fluentd
, as the configuration will allow the default list name to be used.
Before implementing one of the other write methods, let’s complete the circuit with the input plugin. We can use the same utilities that generated the output plugin to generate the skeleton folders and files for the input side. Everything is the same as before rerunning the utility, except that we specify the plugin as input, not output. This will result in the generated code extending a different base class (as we saw in figure 9.2); thus, we need to implement some different functions.
For Redis, the input plugin is effectively a polling activity, as most solutions don’t support callbacks or webhooks (it is worth noting that Redis does have a webhook concept). This means we will need a configuration value as to how quickly the plugin needs to poll Redis. As with the output plugin, we will need the information necessary to connect to Redis. For this latter task, we can copy the code written for the output plugin. While this does not support the excellent coding principles of DRY (don’t repeat yourself), there are plenty of opportunities to improve our code later.
Although we have some additional values to consider, the input plugin processes the configuration attributes just as the output plugin does. The two key functions that need to be implemented on the input plugin are to handle the run
command and the emit
function (as shown in listing 9.6). The run method will be responsible for starting our scheduling thread. The emit function handles calling Redis and emitting the log events to the next process in Fluentd defined by the configuration file.
As a source plugin, the framework will set the tag and timestamp values on the event to reflect the current time and the tag default behavior. Do these values make sense, as they do not truly reflect when the original event occurred? To address this, we are providing the means to determine whether the original tag and time are added to the event record or should be used in the core log event. It is probably best if we allow the person configuring the plugin to determine what should be replaced. If the values should be inserted into the log event, we must determine what attribute names to use. We can address this using the same mechanism for capturing plugin configuration attributes already used.
def emit log.trace "emit triggered" if !@redis ❶ log.debug "reconnecting Redis ",@hostaddr,":",@port connect_redis() end if @redis keep_popping = true ❷ while keep_popping if (@fifo) ❸ popped = @redis.rpop(@listname) else popped = @redis.lpop(@listname) end log.debug "Popped",@listname, ": ", popped if popped data = JSON.parse(popped) if (@use_original_time) time = data[TimeAttributeLabel] else time = Fluent::EventTime.now end if (@use_original_tag) tag = data[RedislistInput::TagAttributeLabel] else tag = @tag end data_record = data.fetch(RecordAttributeLabel).to_s log.debug "original data record=>",data_record if (@add_original_time && !(data_record.include? ➥ '"'+@add_original_time_name+'"')) data_record= inject_original_value(data,data_record, ➥ RedislistInput::TimeAttributeLabel, @add_original_time_name) ❹ end if @add_original_tag && ➥ !(data_record.include? '"'+@add_original_tag_name+'"') data_record = inject_original_value(data,data_record, ➥ RedislistInput::TagAttributeLabel,@add_original_tag_name) end log.debug "Emitting -->", tag," ", time, " ", data_record router.emit(tag, time, data_record) ❺ else keep_popping = false end end else log.warn "No Redis - ", @redis end end def run log.trace ("run triggered") while thread_current_running? ❻ current_time = Time.now.to_i emit() if thread_current_running? while thread_current_running? && Time.now.to_i <= current_time sleep @run_interval ❼ end end end
❶ Determines whether a new connection is required
❷ Sets the loop controller up so we keep calling Fluentd until a shutdown process changes the status of this flag
❸ Redis allows us to treat a list as first in, first out (FIFO) or last in, first out (LIFO). So we can use this configuration to control whether we want to operate the list in a FIFO or LIFO manner.
❹ Determine whether the tag and date-time values should replace the new log event or simply be incorporated in their log event record.
❺ This is when we tell Fluentd to pass the log event onto the next step of the process based on the configuration definition.
❻ The thread handling for the run method, and for as long as the thread is allowed to run
❼ Once we’ve decided the thread has legitimately been woken since the last cycle, we can use emit to send all the log events.
With our input plugin implemented, we can perform a simple test. We could easily incorporate both the input and output plugins into our single configuration (see Chapter9/fluentd/dummy-plugin.conf
); the problem would be that the log information for input and output would intermingle. We would need to extend the plugin path parameter to include both plugins, like this:
fluentd -c Chapter9fluentddummy-plugin.conf -p <plugin absolute ➥ path>Chapter9fluent-plugin-out-redislistlibfluentplugin -p <plugin ➥ absolute path>Chapter9fluent-plugin-redislistlibfluentplugin
Alternatively, we can start two instances of Fluentd with each using their own configuration, so each process has one input. This will make seeing what is happening a lot easier. To do this, repeat the steps previously used to see the output plugin at work. Then, in another console window, we can adapt the command to reference our input plugin path and the Chapter9Fluentddummy-plugin-in.conf
that we have prepared already. The result should look like this:
fluentd -c Chapter9Fluentddummy-plugin-in.conf -p <plugin absolute ➥ path>Chapter9fluent-plugin-redislistlibfluentplugin
With everything running, you should see in one console messages showing the log events being added. Another shows them being removed and the Redis console displaying the two interactions of adding and removing from the list.
With the ability to write and consume log events with a Redis list now in place, let us go back to the write logic and extend the implementation with alternate ways of the output logic working, such as by using the buffer.
As we saw in chapter 4, if the plugin supports a buffer, the I/O process can be optimized. We have already done some performance optimization by configuring the synchronous process to bunch the Redis push operations together, so the Redis connector executes them all at once if we receive more than one log event. But we can further accelerate the process by using the buffer to process larger groups into a single transaction in Redis.
As we saw in chapter 4, out of the box, there are two types of buffers for Fluentd using either a temporary file or memory. Supporting a file implementation does not make sense when our target is an in-memory solution that provides better performance than using physical storage such as disks. This means our solution should only allow the use of the memory buffer.
In figure 9.4, we can see how the log event passes through the different paths of the base Fluentd output class and the functions that need to be implemented, depending on whether buffering is used and whether buffering is synchronous or asynchronous in nature.
As shown in figure 9.4, to keep things simple, we will implement the synchronous buffered path to start with once we have plugins for input and output; then we will revisit to extend the plugin for the buffered use case.
The asynchronous path is largely the same as the synchronous path regarding how we interact with Fluentd. But we have to manage the additional logic that makes the behavior asynchronous and uses the buffer chunk data structures. With the buffering, we could offer either or both approaches to handling the buffer chunks. The options include the following:
Each chunk is controlled by the number of log events it can contain. This is a simple mechanism and very efficient and predictable when log events are consistent in size. But if log events are variable in size, you can exhaust the available memory before filling a chunk.
Each chunk is controlled by allocating the same amount of memory. With the size model, we have the responsibility to implement the logic around calculating the size of the log event, determining if the log event can fit into the current chunk or needs to be put into another chunk. Also, we must decide what to do if the log event is larger than the chunk size limit.
The benefit of this model is that if log events are variable in size, we don’t risk memory exhaustion scenarios, as we’ve capped the number of resources that will be used. The additional code complexity here is not about how Fluentd plugins are written and more about understanding Ruby and how it works with data structures.
In the spirit of “keep it simple stupid” (KISS) and focusing on Fluentd, we will illustrate the number of log events per chunk model. The responsibility of understanding the log events and avoiding the risk of memory exhaustion is on the user. With our approach worked out, we should make it easy for the user by defaulting some of the buffer settings as we would like to have them. As a result, we can incorporate with the configurations the following code fragment:
config_section :buffer do config_set_default :@type, 'memory' config_set_default :chunk_keys, ["tag"] config_set_default :chunk_limit_records, @chunksize end
As you can see, we have defined a chunk size configuration attribute, which has itself defaulted. As a result, we will default to the buffering approach without any configuration values unless explicitly overridden. We should also extend the configure
function to consider the possibility of being supplied with configuration values that we do not recommend or are not supporting, as they are intended for the chunk size model of buffering.
The next step is implementing the write function, which takes a buffer chunk once complete and passes the chunk building up the Redis push calls. We can leverage the previous logic to generate the representation we want to use in Redis. Just as with the synchronous bufferless path, we want to brace the looping through the chunk with the instruction to the Redis connector to batch up all the Redis commands to execute in one go using the redis.multi
and redis.exec
functions. The new write method is shown in the following listing.
def write(chunk) ❶ log.trace "write:", chunk @redis.multi ❷ chunk.each do |time, record| ❸ log.debug "write sync redis push ", chunk.metadata.tag, time, ➥ record, @listname @redis.lpush(@listname,redisFormat(chunk.metadata.tag,time,record)) ❹ end @redis.exec ❺ end
❶ The function is called from the base class logic. We have implemented our own version. Unlike functions like configure, calling super would trigger a not-implemented exception.
❷ Instructs the Redis connector to group all the Redis statements that follow
❸ The chunk has a different structure to the stream structure provided in the unbatched path, so we need a different loop. This will iterate over each chunk entry
❹ As before, we need to take the log event, time, and tag to build a representation to be used in Redis.
❺ Handles Redis connection errors separately from the general catchall, as we can guide the user more effectively for these kinds of issues
We can rerun our test with a slightly different configuration to ensure we use the buffering behavior with this method introduced. This can be done with the following command:
fluentd -c Chapter9Fluentddummy-plugin.conf -p <plugin absolute ➥ path>Chapter9fluent-plugin-out-redislistlibfluentplugin -vv
With the test configuration provided, we should see the write method trace statements occurring periodically. The internal tracing writes log events to stdout
in short bursts as the write function logs the details as it calls the Redis connector.
Our proof-of-concept level implementation of the Redis list plugin has shown how we can deliver a new plugin. In the process of development, there is some commonality between the input and output code. As a result, we have received the go-ahead to make some improvements. Therefore, the first goal is to refactor the input and output to use a common base class.
The testing we have done so far is a manual process that we all know is not the best. In the real world, we would lead with unit testing and build up from there. Ideally, changes in the code should trigger a continuous integration and continuous delivery process, automatically running the unit tests and end-to-end testing.
We will not go into depth here, as unit testing is primarily an aspect of Ruby development, rather than Fluentd. The Fluentd team has built some support libraries that can be used with any major unit test frameworks, including test-unit (https://test -unit.github.io/), RSpec (https://rspec.info/), and minitest. Our example utilizes test-unit, as it is a well-adopted framework and feels like many other major unit test frameworks, such as NUnit, JUnit, and so on.
When we used the utility for generating the plugin skeleton and generated the main Ruby code, the tool also generated a folder structure test in the base plugin folder. This includes providing a skeleton class to help us get started. The test class has the same name as the plugin but with a prefix of test_
. This is a small piece of helper code (helper.rb
) in the base of the test path. This loads the framework’s helper code into the test-unit tool.
To illustrate the possibilities, we have built a couple of tests for the output plugin, as illustrated in listing 9.8. These tests focus on validating the configuration-related logic that drives the behavior of our plugins. This is achieved using part of the Fluentd test framework that implements different types of drivers. The type of driver needed is dictated by the plugin type and mimics the core of Fluentd. We can trigger the necessary operations using the driver, including feeding in log events to the plugin. The driver also provides the means to retrieve and evaluate the results, such as how many times events have been through specific stages (e.g., emit, write). The evaluation can be done using the driver to access and examine the processed events, such as the output of the synchronous and asynchronous write operations, and to simply know how many events have been handled.
The driver capability can be extended to handle the impact of scheduled activities, such as events accumulating in a buffer. The test utilities can also capture standard out and process the text. Capturing such output allows you to verify that the processing of log events is being generated as expected. In our test for advanced config, we apply this technique. If log events are not going to stdout
, but to a log file for Fluentd, it can also be interrogated, looking for how much log information is generated and whether specific log events have occurred.
test 'advanced config' do ❶ conf = %[ ❷ host 127.0.0.1 port 24229 ] captured_string = capture_stdout do ❸ d = create_driver(conf) ❹ assert_equal 24229, d.instance.port ❺ assert_equal '127.0.0.1', d.instance.hostaddr end assert_true (captured_string.include? "Non standard Redis port in use") ❻ d.shutdown ❼ end
❷ Creates a set of configuration values to be passed
❸ Defines a variable to capture any stdout generated within the following statement block
❹ Creates the driver using the test config provided
❺ Tests the values set in the plugin for the port
❻ Evaluates whether the warning about no standard port has been produced
❼ Shuts down everything cleanly
To execute the unit tests, we simply need to follow the unit test framework’s guidance. In the case of test-unit, that comes down to using Ruby to execute the unit test file. For example:
Previously, we identified the need to test different configurations through running Fluentd with alternative configuration files. These should be replaced with unit tests. As the output plugin has restricted how we can use buffering, we need to further test configuration handling. This is best done with unit tests for the configure
and write
functions.
Within the Chapter9/ExerciseResults
directory, we have included two child directories called test-out
and test-in
. These contain the directory structures and files with additional unit tests covering the configure and other operations that you can compare against.
Having completed testing, we can think about packaging and deploying our plugins. This includes preparing the metadata files and documentation.
Part of packaging up a solution includes providing the licensing information and documentation for the plugin. The template utility will have provided a standard license document and a basic README. The most important thing here is to ensure that the readme is clear and complete. Like any good product, the plugin’s ability to be successfully used is predicated on people understanding how to use it, so good documentation will make a meaningful difference. In the download pack (and GitHub repository), we have provided a separate completed readme, so the initial generated state (readme.md
) and final state (readme-final.md
) can be compared. You may notice in the readme content some instructions for helping to complete the Gemfile, which we will come to in a moment.
The task of generating the documentation for the plugins is greatly simplified using another Fluentd-provided utility. The fluent-plugin-config-format
utility takes the plugin type (in the same way that the utility that created the skeleton for us) and names the plugin’s parameters. We can then tell the utility how we would like the documentation to be generated. In the following example, we have used markdown
(it makes it easy for GitHub and other Git-like repositories to render nicely, but other options include pure text and JSON). As we want to generate the documentation from the source, we need to provide the path to the Ruby plugin code. Using this utility with the following parameters will produce the documentation about the plugin configuration information:
The result of the utility is sent to the console rather than to a file, so we do need to cut and paste the output into our readme.md
file (or use some shell/console tricks to pipe the output into the file). We have incorporated the output into the readme - final.md
file.
In the directory structure shown in figure 9.2, we had a doc folder that was not generated by the Fluentd utility. This comes from running RDoc or YARD on the code, resulting in developer-level documentation being generated. In this case, we have opted to use YARD, as it provides some additional neat features over standard RDoc. To find out more about YARD and install it, see appendix E. Note that if you prefer to stick with RDoc, the metadata tags YARD uses will appear in the generated output.
With YARD installed, maintaining this document comes down to looking after the code, commenting, and running the commands from the root folder of the plugin:
This will update the documentation for you, but it focuses on general Ruby code and comments, not the specifics relating to Fluentd, such as the configuration values and their parameters.
The Gemfile needs to be updated as directed by the README in its defaulted content. Therefore, we need to add to the gemspec
file the name of the gem we want to create. It takes the form of gem <name of the gem>
—for example, gem "fluent-plugin-out-redislist"
—for our input plugin.
The gemspec file will also need to be completed with additional information, including the summary, description, home page, license, contact details, and versioning (Semantic Versioning format is expected; https://semver.org). This also needs to include details of any dependencies. When set up, we installed several additional gems, such as the Redis connector. This means we need to add the dependencies into the gemspec
file so they are retrieved when this plugin is installed:
This indicates that the Redis gem is needed at a version of 4.0 or later. The gemspec standard does allow for complex rules to be defined for what versions can be used.
Once complete, we can do this using the RubyGem tooling that we previously installed. This means we can use the gemspec tools to create the final package. But be careful where you execute the command from, as the gemspec file includes scripting, which uses relative paths to locate all the files that need to be included. We can switch on the verbose mode using the -V
parameter to the command to make it easy to see what is going on. To complete the task, we use the command to create the gem file:
With the gem file is created, we need to install it into our local library of gems. Once complete, we can then use the gem without providing the path to the actual Ruby code. This can be done with the command
We can confirm that the gem is in place by executing the command
This should yield a list including our gem.
With the plugins built and the gems created and installed, we can rerun our test scenario. This time the test can be run without referencing the Ruby code directly, as we have made the code available via the gems we created to ensure we don’t accidentally run using the -p
parameter in the Fluentd command line. As a result, we can see our plugin working, but the execution commands reflect the conventional way of working that we would see in production.
The plugin development has been successful and has met all our requirements. But it doesn’t yet reach the level at which we could call the solution suitable for enterprise-class use cases. To bring things up to a suitable standard, we recommend some changes:
Credentials used on the Redis connection—configure the plugin to optionally use an authenticated connection with Redis. This should be done to allow the credentials to be injected from a secure source, such as Vault.
Using an authenticated connection to Redis means passing credentials. We do not really want to be doing that with an HTTP connection with the credentials in cleartext. So, the connection to Redis should be implemented securely with SSL/TLS certificates.
One of the features within Redis, and what makes it such a good cache solution, is the inclusion of TTL. By incorporating TTL, we can control the size of the Redis buffer, and if we cannot keep up, then events will simply expire.
As events are handled, some basic stats could be generated that could potentially be consumed by Prometheus.
Extend and enhance the unit testing to achieve a target of greater than 60% coverage.
Develop the prebuilt Rakefile to include and execute all the unit tests. Also, incorporate activities such as
Incorporate the process into a continuous integration tool suite.
Consolidate the readme.md
documentation rather than using the readme
- final.md
If you wish to develop your plugin development skills or use this as a practical opportunity to develop your Ruby development skills, we would encourage you to try implementing these suggested improvements. As you won’t be following our steps when implementing these features, we would recommend that you adopt your preferred development tool(s). We do not have a reference solution to this, but running the test scenarios as you implement your solution to this exercise will confirm a successful outcome functionally.
Fluentd plugin tooling provides the means to create the skeleton code to develop both input and output plugins.
Fluentd’s skeleton supports asynchronous and synchronous buffering output plugin functions.
Fluentd input plugins need to be able to set the time and tag details. The input plugin implements this logic.
The plugin framework includes defining, creating, and defaulting the configuration properties for the plugin loaded from the configuration file.
Fluentd plugins are typically made available as RubyGems. The utilities provided by Ruby and Fluentd make this easy to achieve.
18.218.188.95