9 Creating custom plugins

This chapter covers

  • Developing custom Fluentd plugins for Redis
  • Using Fluentd utilities to speed up development
  • Implementing the Fluentd plugin life cycle methods
  • Testing and packaging custom Fluentd plugins
  • Creating documentation for the custom plugins

At various points in the book, we have referred to Fluentd’s support for the development of plugins beyond those from the core product. The extensibility of Fluentd has led to a robust ecosystem of third-party plugins to make it easy to capture, filter, manipulate, and send them to many different systems and data stores. We have also discussed how custom plugins could connect to and monitor esoteric and legacy solutions when things cannot easily or efficiently be achieved with the existing plugins.

In this chapter, we will walk through the process of creating an input and output plugin that makes use of Redis’s list capability. We will take a closer look at Redis and the rationale behind its use.

Developing Ruby

This chapter does call for some development experience. Still, you do not need to be a hardcore Ruby developer to take the information shown here and put it into action. Like most languages, once you have had some development experience with one language, you can use that understanding to start coming to grips with others. I count myself in this category given that I’ve come through a career of programming with Ada, C, C++, and then Java for the last 15 or more years. We are not going to write another Ruby book; others have done an exceptional job of this already, such as David A. Black and Joseph Leo III with The Well-Grounded Rubyist, 3rd edition (Manning, 2017). I have tried to provide just enough detail for you to understand what we are doing with Ruby and the code without first learning the language. After all, it is our goal to help you understand what is happening with Fluentd. In appendix E, we have included links to resources to help you learn the basics of Ruby and to better understand the tools used, or you might want to make use of them if you take on the exercises.

9.1 Plugin source code

Code for the plugin developed here is included with the download of the book or retrievable from our GitHub repository (http://mng.bz/M20W). If you want to build upon what is provided, we encourage you to fork the GitHub repository and develop it as you wish; enhancement opportunities you might like to consider include

  • Moving to use the RedisTimeSeries feature (more on this in a moment)

  • Developing support for Fluentd’s block size-based buffering

  • Increasing security on the connection to Redis (e.g., using SSL/TLS connections and credentials, such as username and password)

Whatever approach you take, all we ask is for you to acknowledge this book as the starting point in the code.

9.2 What is Redis, and why build a plugin with the Redis list capability?

Redis is an open source, scalable, in-memory storage solution built around name-value pairs. The ability to define details such as time to live (TTL; this means that the time to hold the data is defined, after which it is automatically deleted) on data elements held, making for an exceptional caching tool. (In appendix E, we have provided plenty of supporting references for Redis in addition to those for Ruby.)

Aside from using Redis to help demonstrate plugin development, there are some potential real-world benefits of developing such a plugin:

  • Rather than using a small, embedded, in-memory cache, we can use a highly resilient open source option that can be far more effective in scaling and replicating the cached data across multiple servers (for more insight into the use of caches, check out https://techterms.com/definition/cache).

  • It provides an additional option for enabling Fluentd nodes to collaborate efficiently.

  • It creates integrations with Redis lists, such as the Ruby Resque (Ruby queuing implementation; https://github.com/resque), providing opportunities to support or use additional services.

  • It provides a means to keep events held in order, as Redis lists support the first in, first out (FIFO) pattern; this allows us to keep log events in sequence (i.e., in time series).

Caching solutions such as Redis operate by storing the data within memory data structures rather than long-term storage like a conventional database. The structures handle the data as key-value pairs. The internal structures can be very sophisticated to allow the cache to quickly locate the correct bit of memory. In addition to this, the data can be handled as a time series of events. The data held can also have a TTL before being automatically removed from the store.

9.2.1 Redis list over RedisTimeSeries

At the time of this writing, Redis Labs has developed the high-performance RedisTimeSeries, which handles data in a time-series format made of two 64-bit structures representing the time and a value (more at https://oss.redislabs.com/redistimeseries). Using it presents some challenges:

  • There is no support for a Windows native option, and we want to keep things as simple as possible without needing to work through the different approaches to Linux virtualization.

  • The data structure used by RedisTimeSeries cannot hold the log events in 128 bits, which means that to use this structure, the value part must act as a “foreign key” to another data structure. Taking the example solution to an enterprise-class capability may make this worthwhile, but it adds complexity related to Redis, not Fluentd, and therefore won’t be beneficial to this book.

Given these points, we will stick with the vanilla Redis features.

9.3 Illustrating our objective using Redis CLI

Before we start looking at any development activities, let’s simulate the behavior we want to achieve with the plugins. Simulating the behavior will make it easier to relate the development activities back to the solution. To do this, we first need to install Redis, using the details provided in appendix A. Once Redis is installed, we will use its command-line interface (CLI) to simulate the effect of the plugins we will build.

The Redis server needs to be started in its own shell. This can be done with the command

redis-server

Once we see evidence of that process running (Redis reporting to the console), the next step is to start the Redis CLI with the command redis-cli in a new console window. We can recognize when the command has worked, as the command prompt will look like

redis 127.0.0.1:6379 >

We can confirm that we have a proper Redis server connection by entering the command info. This will result in a lot of settings information, including the Redis version displayed in the console. With one redis-cli running, we need to repeat the process in a separate shell to simulate the effect of two different processes interacting with the cache. For the rest of this section, we will refer to these as CLI 1 and CLI 2. As we go through the simulation, imagine CLI 1 is acting as a Fluentd output plugin and CLI 2 as a Fluentd input plugin. In CLI 1, we want to run the following list push (lpush) commands:

lpush fluentd '{"tag":"demo", "timestamp" : 1606076261, "record" : {"blah" : "blah 1"}}'
lpush fluentd '{"tag":"demo", "timestamp" : 1606076263, "record" : {"blah" : "blah 2"}}'
lpush fluentd '{"tag":"demo", "timestamp" : 1606076267, "record" : {"blah" : "blah 3"}}'

Each time the command is issued, you will get a response looking like (integer) 1. To start with, we will see this response changing as the list depth changes. We can use the list length (llen) command

llen fluentd

to find out how many entries in the list exist. The reference to fluentd in these commands is the name of the list we are using. Notice how we need to use single quotes at the beginning and end of the record so that we can use double quotes in the JSON. Now, in CLI 2, issue the list pop (lpop) command:

lpop fluentd

As a result, you will see the first entry provided in CLI 1 displayed on the CLI 2 console. Repeat the command lpop fluentd, and the response will include the second record added. Add more entries on CLI 1 and continue popping on CLI 2 until you see a nil response. This means the list has been emptied.

CLI 1 is effectively our output plugin behavior in our simulation, and CLI 2 is our source plugin behavior. This can help our Fluentd setup by smoothing out the spikes in the activity. Fluentd input and output nodes are transient in nature (as may be the case of a containerized environment). In that case, we have efficient in-memory storage of log events (holding events in memory will not suffer the same I/O performance impacts of storage to disk).

9.4 Preparing for development

Before we can start any development, some basic preparation is needed. You need to have

  • Prepared an installation of Fluentd and Ruby, or continue using the existing installation that has been used with the previous chapters (if you would rather start fresh, then all the necessary steps are detailed in appendix A).

  • Prepared a simple installation of Redis, if you have not already done this (details for this are in appendix A).

  • Chosen an IDE (integrated development environment) that can support Ruby and installed the IDE and any relevant extensions needed (we will be using Visual Studio Code). As this tends to be a personal choice, we will leave the choice and installation up to you.

  • Installed the Redis Gem to write Ruby code to talk with Redis (detailed in appendix A).

  • Established a folder for implementing our plugin; this may align with the folder structure created when you retrieved all the book’s support files.

    We do not want to accidentally pollute the use of Ruby and the catalog of gems with our development efforts. One way to achieve this is to allow our development code to be picked up by Fluentd when we start it rather than require it to be packaged as a gem and deployed to test code. There are also ways to comingle development code with the gems file system, which we would not recommend.

For the rest of this chapter, we will assume that path starts as c:myDevGitHub|LoggingInActionChapter9. Once these components are in place, then we are ready to start.

9.5 Plugin frameworks

As you have seen throughout the book, Fluentd works with a strong foundation of plugins of different types—input, output, filter, parser, and formatter. Fluentd needs to impose some common mechanisms, including naming conventions and folder structure, to make plugins work and tell the core of Fluentd about the different plugins installed. To help with this, Fluentd includes some tools to help make sure plugin development complies with the required conventions. Plugins have a class hierarchy with classes to build upon for each plugin type. Figure 9.1 illustrates this class hierarchy.

Figure 9.1 The foundation classes on which we can build our plugin, all of which are located within the lib/fluent/plugin folder of the source tree

9.5.1 Creating the skeleton plugin

Fluentd provides a tool for building the skeleton framework for our plugin. Before this is executed, you need to be in the correct location that you want to use for Fluentd development (e.g., the root folder for your own GitHub project), as it will use that folder as the starting point for all the artifacts. Once in the correct location, we can use the command fluent-plugin-generate <plugin type> <plugin name>. The plugin type represents the type of plugin (input, output, etc.) and the plugin’s name. We are going to start with the output plugin. In our case, this makes the complete call:

fluent-plugin-generate output redislist

We have opted for the name redislist, as the plugin works specifically with Redis’s lists capability. As with most data stores, there is already a general-purpose Redis plugin in existence (http://mng.bz/aDP7); you might want to build a plugin to monitor its health as well.

As a result of using the utility, we get a directory structure as illustrated in figure 9.2, which includes some skeleton configuration and code files. As we progress through this chapter, we will address each of the files.

Figure 9.2 The directory structure and files generated when we run the fluent-plugin-generate utility. Colors indicate what needs to be modified to complete a plugin, as shown in the key.

The code we will develop for the plugin is shown in the /lib/fluent/plugin folder, as illustrated in figure 9.2, with the Ruby file based on the plugin name (e.g., out_ redislist.rb). The code we develop in the following few sections will go into this file.

9.5.2 Plugin life cycle

Plugins are taken through a life cycle, with each stage having a method that can be overridden if you need to implement the logic for that specific stage of a life cycle. As we will see during the development of our input and output plugins, we do not have to use every stage, but some are essential in nearly every case—for example, configure, start, and shut down. Each of these states also has a query function to determine whether that is the current state—for example, after_shutdown? Figure 9.3 illustrates the full life cycle that a plugin supports along with the goal of each stage.

Figure 9.3 Plugins go through this life cycle with methods provided in the skeleton to be overloaded and implement specific logic needed at each life cycle stage. Not all states have to be overloaded.

9.6 Implementing the plugin core

We’ve generated the skeleton and taken a moment to look at the life cycle stages; now the plugin can inject any specific behavior required. We can now get down to writing the plugin code that processes the configuration and executes the handling of log events, and we’ll cover connecting and disconnecting with Redis.

9.6.1 How configuration attributes work

With the plugin structured, the first step is to define the configuration attributes the it will use. This is achieved by using the config_param object, which takes the values for

  • The attribute name as it would be used in the configuration file.

  • The attribute’s data type, which could be any of string, integer.

  • The default value, if one can be specified.

  • Whether the value should be kept secret when Fluentd performs a dry run or the startup output of its configuration. By default, this is false and therefore not needed. But to illustrate the behavior, we have included its use.

  • The ability to define an alias for the attribute name. Aliasing can be helpful when a plugin changes over time.

We need to capture a port number, host address (a DNS name or IP), and the list’s name held by Redis. To do this, we need to define several configuration attributes to be specified in the configuration file. The code is a sequence of desc and config_param statements, with the desc providing the description of the following config_param appearing in the code. We know the data types and potential default values mean we should exploit the available attributes to simplify data validation. This will also make the user’s experience as simple as possible. As a result, we should end up with code as illustrated in the following listing.

Listing 9.1 Chapter9/fluent-plugin-out-redislist/lib/fluent/out_redislist.rb

desc "specifies the port to connect to Redis with, will
 default to 6379 if not specified"                                  
 
config_param :port, :integer, default: 6379, secret: false, alias:
:portNo                                                             
 
desc "Defines the host address for Redis, if not defined
 127.0.0.1"
config_param :hostaddr, :string, default: "127.0.0.1",
 secret: false                                                      
 
desc "Defines the name of the list to be used in Redis, by
 default this is Fluentd"
config_param :listname, :string, default: "fluentd"
 
desc "Defines the number of reconnection attempts before giving
 up on connecting to the Redis server."
config_param :reconnect_attempts, :integer, default: 2
 
desc "Defines the number of seconds before timing out a connection
 to the Redis server"
config_param :connection_timeout, :integer, default: 5
 
desc "Defines the number of log events in a chunk"
config_param :chunksize, :integer, default: 20

Providing the desc entry before each config_param means that tools can be used to generate the documentation for the plugin configuration, as well as provide documentation internally.

This is an example of setting an integer configuration value and defaulting the value. We can also indicate to Fluentd when it generates the summary of the configuration at startup and outputs it; we can tell Fluentd with the secret parameter whether the value should be included. We have also told Fluentd that if it receives portNo as a configuration value, then use it as the port number source.

As with the previous example, we are setting a default value. This time we are providing a string.

Once defined, we can refer to the values as class-level elements and use the Ruby @ notation (e.g., @port) within the rest of the code.

We should also hold the Redis connection as a class member, so we do not need to re-create the connection every time we interact with Redis. With the configuration details, we could also try to create the Redis connection now. Still, by the life cycle definitions, that would be incorrect, and if another plugin configuration failed, Fluentd might choose not to start our plugin. As a result, we would be consuming unnecessary resources, such as the network connection. But we need the Redis dependency incorporated into the code with require "redis" declaration at the top of the file.

Note When it comes to coding style, I create the steps needed in the plugin as small functions. This may appear inefficient, but the optimizers in compilers, interpreters, and language virtual machines can usually optimize out these overheads. This approach does make for easier testing, and each step is easier to examine in isolation.

The framework provides a configure function, as seen in the life cycle (see figure 9.3). This gives us the chance to implement any additional custom validation needed. We can also use this method to define our own class-level variables. To illustrate this capability, we will implement code to check if the network port defined is the default Redis port. If it isn’t the default port, log a warning message to remind developers that they need to ensure ports are deconflicted. This code is in the following listing, with both our check function and the implementation of the configure function, which uses any inherited behavior.

Listing 9.2 Chapter9/fluent-plugin-out-redislist/lib/fluent/out_redislist.rb

def check_port(conf)
    log.trace "checkport invoked"
  port = conf['port']                            
 
  if (port != RedislistOutput::DefaultPort)      
     log.info ("Default Redis port in use")
  else
     log.warn ("Non standard Redis port in use - ensure ports are deconflicted")
  end
end
 
def configure(conf)                              
 
super                                            
  checkPort (conf                                
end

Retrieves the configuration value from the list of configuration attributes using the defined name

We defined a constant in the class for the default port.

This is the method that is invoked as part of the plugin life cycle and triggers our check_port method.

Makes sure any inherited logic is executed first

Invokes our method, passing all the configuration data over

9.6.2 Starting up and shutting down

After the configuration, the most critical functions in the life cycle will be the start and shutdown functions for most plugins. These are the ideal moments to create or close connections, such as those to Redis. Since establishing connections to storage solutions is often relatively slow, we want to perform the task before actively communicating with the remote solution. Redis allows us to define time-outs on the connections and how many reconnection attempts we can have. We’ll set these up as configuration values, but for now, let’s hardwire them. If the connection fails to be established, we should make it easy to recognize during the plugin life cycle. We can do this by setting the connector to nil and logging the issue.

We need to incorporate the start and shutdown functions shown in the following listing into our plugin and our supporting functions to ensure that the required behavior is achieved.

Listing 9.3 Chapter9/fluent-plugin-out-redislist/lib/fluent/plugin/out_redislist.rb

def connect_redis()
  log.trace "connect_redis - Create connection if non existant"
  if !@redis
    begin
      @redis=Redis.new(host:@hostaddr,port:@port,connect_timeout:@connection_
       timeout,reconnect_attempts:@reconnect_attempts)                      
      log.debug "Connected to Redis "[email protected]?.to_s
    rescue Redis::BaseConnectionError, Redis::CannotConnectError => conn_err  
      log.error "Connection error - ", conn_err.message, "
 connection
  timeout=", @connection_timeout, "
 connection attempts=",
  @reconnect_attempts
      @redis = nil
      return nil
    rescue => err
      log.error "Error connecting to redis - ", err.message,
  "|",err.class.to_s
       @redis = nil
      return nil
    end
  end
end
 
def start
  super
  log.trace "starting redis plugin
"
  connect_redis()                                                             
 
end
 
def shutdown
  super                                                                       
  log.trace "shutdown"
  if @redis                                                                   
    begin
      @redis.disconnect!
      log.debug "disconnecting from redis
"
      @redis = nil
    rescue
      log.error "Error closing Redis connection"
    end
  end
end

Establishing the Redis connection, informing the connection parameters from values retrieved from the configuration properties and defined constants

Handles Redis connection errors separately from the general catchall, as we can guide the user more effectively for these kinds of issues

Establishes a connection to Redis as part of the startup once all the inherited activities are completed

Ensures any inherited tasks are completed

If we still have a Redis connection, then start the disconnection process.

9.6.3 Getting the plugin to work with our Fluentd installation

With enough code to prove we can get our plugin to at least start and stop, we can run a simple test with the full Fluentd. The Fluentd tooling includes extensions to help with unit testing, but it is always rewarding to see code firing up as part of something bigger, particularly when the startup is quick. To do this, we need to ensure Fluentd can pick up our plugin code.

We need a test configuration to be able to run Fluentd. We have prepared one for this job, and it can be retrieved from Chapter9/Fluentd/dummy-plugin.conf. Of course, you might like to choose to develop your own configuration, given everything you’ve learned in the book.

We have repeated using the dummy source Fluentd plugin to generate log events, as the content is not essential. The crucial element is the match configuration, as illustrated in the following listing.

Listing 9.4 Chapter9/Fluentd/dummy_plugin.conf

<match *>
  @type redislist          
    portno 6379
 
  #<buffer>                
 
  #  flush_interval 120
  #</buffer>
</match>

This is the declaration to use our plugin.

As we build upon the base class provided, the in-memory helper plugin is available if the use of the buffer attribute is defined. For now, we have the buffer configuration commented out.

Before starting Fluentd, we need to start the Redis server as we did earlier in the chapter. Eventually, we will want to package and deploy our plugin just like any other Fluentd plugin using the gem tools. But to start with, we do not want to go through the additional effort of deploying and undeploying a gem every time we make a change. To avoid the gem deployment issue, we can add parameters to the command line telling Fluentd to pick up the source code of our plugin from a Ruby file. For example, my copy of the plugin directory structure starts at

c:myDevGitHub|UnifiedLoggingWithFluentdChapter9
 fluent-plugin-out-redislist

Then the extended Fluentd command will look as follows:

fluentd -c Chapter9Fluentddummy-plugin-out.conf -p
 c:myDevGitHub|UnifiedLoggingWithFluentdChapter9fluent-plugin-out-
 redislistlibfluentplugin  -vv

Going forward, we will show the path as <plugin absolute path>Chapter9 fluent-plugin-out-redislistlibfluentplugin, where you need to substitute <plugin absolute path> accordingly.

Combining the configuration file and extending the path to collect our plugin mean our command to start Fluentd will result in Fluentd displaying the configuration as it starts up. With the Redis server running at the command line, it will be possible to see Redis logging the number of connections it has as we start and stop Fluentd.

9.6.4 Putting additional configuration validation into action

Your objective is to restart the Redis server on a different port and create an alternative Fluentd configuration to connect to Redis on a different port. You need to confirm that our configuration check is performing correctly. To restart Redis on a different port, add --port nnnn, where nnnn represents the port number to use with the startup command.

Answer

The modified configuration file solution can be found at Chapter9ExerciseResultsdummy-plugin-Answer.conf, where we have changed the port from 6379 to 16379. We also modified the log message generated by the dummy output plugin, although we will not see that yet.

When the Redis server is started, we need to add --port 16379 to override the default. Our Fluentd startup command now becomes

fluentd -c Chapter9ExerciseResultsFluentddummy-plugin-Answer.conf -p
 <plugin absolute path>Chapter9
 fluent-plugin-out-redislistlibfluentplugin

When Fluentd starts up, we should see the warning in the log output from Fluentd about using a nonstandard port. But the Redis server should report the connection.

9.6.5 Implementing the Redis output logic

Having proven we can configure, start up, and shut down the plugin, we can move to the next step of implementing the logic of sending the events to Redis. There are several ways the logic can be executed:

  • Synchronous—Process each event by implementing the method def process (tag, es). This is the most straightforward approach and the least performant for execution, as it does not use any buffering.

  • Synchronous buffered—Output is implemented by the method def write (chunk).

  • Asynchronous buffered—Output is implemented by the method def try_write (chunk).

Which implementation method is used is dictated by whether the configuration includes a <buffer> section or not, unless we configure some override to the standard behavior. For the first implementation, we will keep it nice and straightforward with the synchronous model.

Our implementation process needs to tag the event stream passed as (es) and iterate through the events. As the stream could contain multiple events, we can make the process a little more efficient by telling Redis to batch up executing the insertion of the events. This is done by telling Redis it will receive multiple transaction calls using the command redis.multi. Once we have iterated through the events, tell Redis it can execute the transactions using the call redis.exec. As we iterate through the log events, we need to perform the following actions:

  • Build a JSON representation of the log event(s). If you have reviewed the output interface, you will note that there is a predefined formatter function. We have chosen not to override or use this, as we do not want to impact other applications of this method within the plugin’s base classes; we can therefore format the presentation in any desired manner—for example, using msgpack.

  • Perform a Redis list push function.

We should be defensive in our code to handle the scenario of losing the Redis connection before all the events have been committed to Redis.

We need to transform the log event to JSON for potentially three different functions; we should write the logic once and invoke it from the different plugin methods involved. The result of this is two methods, as shown in the following listing.

Listing 9.5 Chapter9/fluent-plugin-out-redislist/lib/fluent/pluginout_redislist.rb

def redisFormat(tag,time,record)                          
 
  redis_entry = Hash.new
  redis_entry.store(RedislistOutput::TagAttributeLabel,
   tag.to_s)                                            
  redis_entry.store(RedislistOutput::TimeAttributeLabel,
   time.to_i)
  redis_entry.store(RedislistOutput::RecordAttributeLabel,
   record.to_s)
  redis_out = JSON.generate(redis_entry)                  
 
  return redis_out
end
 
def write(chunk)                                          
  log.trace "write:", chunk
 
  @redis.multi                                            
 
  chunk.each do |time, record|
    log.debug "write sync redis push ", chunk.metadata.tag,
     time, record, @listname @redis.lpush(@listname,redisFormat
     (chunk.metadata.tag, time, record))
    end
  @redis.exec                                             
 
end

This is our function that translates the log event into a JSON representation. We need to build our own JSON representation, as we need to capture all the log event attributes.

By using predefined constants, these could be shared with the input plugin.

As the record, event time, and tag represent a flat structure, we can build a simple hash structure and then exploit the prebuilt operations to convert it to JSON. Note we don’t use the formatter method, as this is used by other parts of Fluentd, such as the buffer, and we don’t wish to confuse that logic with a variant representation.

This is one of the standard functions used by an output plugin.

This tells Redis to accept multiple statements that should be executed in a single operation.

This releases the Redis library to send all the statements to the Redis server as a single block.

9.6.6 Putting the testing of synchronous output into action

We have reached a state where we can confirm that we can write to Redis. Using the previously illustrated approach, restart Fluentd and use the illustrated Redis commands to review the list in Redis and pop entries in the list.

Answer

In repeating the Fluentd test, with the write method now in place, you should expect the Redis commands to show the list structure to grow with JSON content, looking something like this:

{"tag" : "dummy", "time" : "2014-12-14 23:23:38", "record" :
 {"hello" : "world", "counter":1}}

The log events as they are generated will increment the counter value due to the source configuration. This will mean there should be a correlation between the counter attribute values and the Redis list length. This can be confirmed with the llen command in the Redis CLI. You will also be able to pop the entries from the list using the command lpop fluentd, as the configuration will allow the default list name to be used.

9.7 Implementing the Redis input plugin

Before implementing one of the other write methods, let’s complete the circuit with the input plugin. We can use the same utilities that generated the output plugin to generate the skeleton folders and files for the input side. Everything is the same as before rerunning the utility, except that we specify the plugin as input, not output. This will result in the generated code extending a different base class (as we saw in figure 9.2); thus, we need to implement some different functions.

For Redis, the input plugin is effectively a polling activity, as most solutions don’t support callbacks or webhooks (it is worth noting that Redis does have a webhook concept). This means we will need a configuration value as to how quickly the plugin needs to poll Redis. As with the output plugin, we will need the information necessary to connect to Redis. For this latter task, we can copy the code written for the output plugin. While this does not support the excellent coding principles of DRY (don’t repeat yourself), there are plenty of opportunities to improve our code later.

Although we have some additional values to consider, the input plugin processes the configuration attributes just as the output plugin does. The two key functions that need to be implemented on the input plugin are to handle the run command and the emit function (as shown in listing 9.6). The run method will be responsible for starting our scheduling thread. The emit function handles calling Redis and emitting the log events to the next process in Fluentd defined by the configuration file.

As a source plugin, the framework will set the tag and timestamp values on the event to reflect the current time and the tag default behavior. Do these values make sense, as they do not truly reflect when the original event occurred? To address this, we are providing the means to determine whether the original tag and time are added to the event record or should be used in the core log event. It is probably best if we allow the person configuring the plugin to determine what should be replaced. If the values should be inserted into the log event, we must determine what attribute names to use. We can address this using the same mechanism for capturing plugin configuration attributes already used.

Listing 9.6 Chapter9/fluent-plugin-redislist/lib/fluent/plugin/in_redislist.rb

def emit
  log.trace "emit triggered"
  if !@redis                                                                
    log.debug "reconnecting Redis ",@hostaddr,":",@port
    connect_redis()  
  end
 
  if @redis
    keep_popping = true                                                     
 
    while keep_popping
      if (@fifo)                                                            
        popped = @redis.rpop(@listname)
      else
        popped = @redis.lpop(@listname)
      end
   
      log.debug "Popped",@listname, ": ", popped
      if popped
            data = JSON.parse(popped)
 
        if (@use_original_time)
                    time = data[TimeAttributeLabel]
        else
          time = Fluent::EventTime.now
        end
 
        if (@use_original_tag)
          tag = data[RedislistInput::TagAttributeLabel]
        else
          tag = @tag
        end
 
        data_record = data.fetch(RecordAttributeLabel).to_s
        log.debug "original data record=>",data_record
 
        if (@add_original_time && !(data_record.include?
         '"'+@add_original_time_name+'"'))
          data_record= inject_original_value(data,data_record,
           RedislistInput::TimeAttributeLabel, @add_original_time_name)   
        end
 
        if @add_original_tag &&
         !(data_record.include? '"'+@add_original_tag_name+'"')
          data_record = inject_original_value(data,data_record,
           RedislistInput::TagAttributeLabel,@add_original_tag_name)
        end
 
        log.debug "Emitting -->", tag," ", time, " ", data_record
        router.emit(tag, time, data_record)                                 
      else
        keep_popping = false
      end
    end
  else
    log.warn "No Redis - ", @redis
  end
end
 
def run
  log.trace ("run triggered")
  while thread_current_running?                                             
    current_time = Time.now.to_i
 
    emit() if thread_current_running?
    while thread_current_running? && Time.now.to_i <= current_time
      sleep @run_interval                                                   
    end
  end
end

Determines whether a new connection is required

Sets the loop controller up so we keep calling Fluentd until a shutdown process changes the status of this flag

Redis allows us to treat a list as first in, first out (FIFO) or last in, first out (LIFO). So we can use this configuration to control whether we want to operate the list in a FIFO or LIFO manner.

Determine whether the tag and date-time values should replace the new log event or simply be incorporated in their log event record.

This is when we tell Fluentd to pass the log event onto the next step of the process based on the configuration definition.

The thread handling for the run method, and for as long as the thread is allowed to run

Once we’ve decided the thread has legitimately been woken since the last cycle, we can use emit to send all the log events.

9.7.1 Testing input and output plugin execution

With our input plugin implemented, we can perform a simple test. We could easily incorporate both the input and output plugins into our single configuration (see Chapter9/fluentd/dummy-plugin.conf); the problem would be that the log information for input and output would intermingle. We would need to extend the plugin path parameter to include both plugins, like this:

fluentd -c Chapter9fluentddummy-plugin.conf -p <plugin absolute
 path>Chapter9fluent-plugin-out-redislistlibfluentplugin -p <plugin
 absolute path>Chapter9fluent-plugin-redislistlibfluentplugin

Alternatively, we can start two instances of Fluentd with each using their own configuration, so each process has one input. This will make seeing what is happening a lot easier. To do this, repeat the steps previously used to see the output plugin at work. Then, in another console window, we can adapt the command to reference our input plugin path and the Chapter9Fluentddummy-plugin-in.conf that we have prepared already. The result should look like this:

fluentd -c Chapter9Fluentddummy-plugin-in.conf -p <plugin absolute
 path>Chapter9fluent-plugin-redislistlibfluentplugin

With everything running, you should see in one console messages showing the log events being added. Another shows them being removed and the Redis console displaying the two interactions of adding and removing from the list.

With the ability to write and consume log events with a Redis list now in place, let us go back to the write logic and extend the implementation with alternate ways of the output logic working, such as by using the buffer.

9.8 Extending output with buffering

As we saw in chapter 4, if the plugin supports a buffer, the I/O process can be optimized. We have already done some performance optimization by configuring the synchronous process to bunch the Redis push operations together, so the Redis connector executes them all at once if we receive more than one log event. But we can further accelerate the process by using the buffer to process larger groups into a single transaction in Redis.

As we saw in chapter 4, out of the box, there are two types of buffers for Fluentd using either a temporary file or memory. Supporting a file implementation does not make sense when our target is an in-memory solution that provides better performance than using physical storage such as disks. This means our solution should only allow the use of the memory buffer.

In figure 9.4, we can see how the log event passes through the different paths of the base Fluentd output class and the functions that need to be implemented, depending on whether buffering is used and whether buffering is synchronous or asynchronous in nature.

Figure 9.4 The different methods involved in outputting log events from an output plugin depending on the use and type of buffering

As shown in figure 9.4, to keep things simple, we will implement the synchronous buffered path to start with once we have plugins for input and output; then we will revisit to extend the plugin for the buffered use case.

The asynchronous path is largely the same as the synchronous path regarding how we interact with Fluentd. But we have to manage the additional logic that makes the behavior asynchronous and uses the buffer chunk data structures. With the buffering, we could offer either or both approaches to handling the buffer chunks. The options include the following:

  • Each chunk is controlled by the number of log events it can contain. This is a simple mechanism and very efficient and predictable when log events are consistent in size. But if log events are variable in size, you can exhaust the available memory before filling a chunk.

  • Each chunk is controlled by allocating the same amount of memory. With the size model, we have the responsibility to implement the logic around calculating the size of the log event, determining if the log event can fit into the current chunk or needs to be put into another chunk. Also, we must decide what to do if the log event is larger than the chunk size limit.

    The benefit of this model is that if log events are variable in size, we don’t risk memory exhaustion scenarios, as we’ve capped the number of resources that will be used. The additional code complexity here is not about how Fluentd plugins are written and more about understanding Ruby and how it works with data structures.

In the spirit of “keep it simple stupid” (KISS) and focusing on Fluentd, we will illustrate the number of log events per chunk model. The responsibility of understanding the log events and avoiding the risk of memory exhaustion is on the user. With our approach worked out, we should make it easy for the user by defaulting some of the buffer settings as we would like to have them. As a result, we can incorporate with the configurations the following code fragment:

      config_section :buffer do
        config_set_default :@type, 'memory'
        config_set_default :chunk_keys, ["tag"]
        config_set_default :chunk_limit_records, @chunksize
      end

As you can see, we have defined a chunk size configuration attribute, which has itself defaulted. As a result, we will default to the buffering approach without any configuration values unless explicitly overridden. We should also extend the configure function to consider the possibility of being supplied with configuration values that we do not recommend or are not supporting, as they are intended for the chunk size model of buffering.

The next step is implementing the write function, which takes a buffer chunk once complete and passes the chunk building up the Redis push calls. We can leverage the previous logic to generate the representation we want to use in Redis. Just as with the synchronous bufferless path, we want to brace the looping through the chunk with the instruction to the Redis connector to batch up all the Redis commands to execute in one go using the redis.multi and redis.exec functions. The new write method is shown in the following listing.

Listing 9.7 Chapter9/fluent-plugin-out-redislist/lib/fluent/out_redislist.rb

def write(chunk)                                                          
  log.trace "write:", chunk
 
  @redis.multi                                                            
  chunk.each do |time, record|                                            
    log.debug "write sync redis push ", chunk.metadata.tag, time,
     record, @listname  
    @redis.lpush(@listname,redisFormat(chunk.metadata.tag,time,record))   
  end
  @redis.exec                                                             
end

The function is called from the base class logic. We have implemented our own version. Unlike functions like configure, calling super would trigger a not-implemented exception.

Instructs the Redis connector to group all the Redis statements that follow

The chunk has a different structure to the stream structure provided in the unbatched path, so we need a different loop. This will iterate over each chunk entry

As before, we need to take the log event, time, and tag to build a representation to be used in Redis.

Handles Redis connection errors separately from the general catchall, as we can guide the user more effectively for these kinds of issues

We can rerun our test with a slightly different configuration to ensure we use the buffering behavior with this method introduced. This can be done with the following command:

fluentd -c Chapter9Fluentddummy-plugin.conf -p <plugin absolute
 path>Chapter9fluent-plugin-out-redislistlibfluentplugin  -vv

With the test configuration provided, we should see the write method trace statements occurring periodically. The internal tracing writes log events to stdout in short bursts as the write function logs the details as it calls the Redis connector.

9.8.1 Improving our scenario by putting maintainability into action

Our proof-of-concept level implementation of the Redis list plugin has shown how we can deliver a new plugin. In the process of development, there is some commonality between the input and output code. As a result, we have received the go-ahead to make some improvements. Therefore, the first goal is to refactor the input and output to use a common base class.

9.9 Unit testing

The testing we have done so far is a manual process that we all know is not the best. In the real world, we would lead with unit testing and build up from there. Ideally, changes in the code should trigger a continuous integration and continuous delivery process, automatically running the unit tests and end-to-end testing.

We will not go into depth here, as unit testing is primarily an aspect of Ruby development, rather than Fluentd. The Fluentd team has built some support libraries that can be used with any major unit test frameworks, including test-unit (https://test -unit.github.io/), RSpec (https://rspec.info/), and minitest. Our example utilizes test-unit, as it is a well-adopted framework and feels like many other major unit test frameworks, such as NUnit, JUnit, and so on.

When we used the utility for generating the plugin skeleton and generated the main Ruby code, the tool also generated a folder structure test in the base plugin folder. This includes providing a skeleton class to help us get started. The test class has the same name as the plugin but with a prefix of test_. This is a small piece of helper code (helper.rb) in the base of the test path. This loads the framework’s helper code into the test-unit tool.

To illustrate the possibilities, we have built a couple of tests for the output plugin, as illustrated in listing 9.8. These tests focus on validating the configuration-related logic that drives the behavior of our plugins. This is achieved using part of the Fluentd test framework that implements different types of drivers. The type of driver needed is dictated by the plugin type and mimics the core of Fluentd. We can trigger the necessary operations using the driver, including feeding in log events to the plugin. The driver also provides the means to retrieve and evaluate the results, such as how many times events have been through specific stages (e.g., emit, write). The evaluation can be done using the driver to access and examine the processed events, such as the output of the synchronous and asynchronous write operations, and to simply know how many events have been handled.

The driver capability can be extended to handle the impact of scheduled activities, such as events accumulating in a buffer. The test utilities can also capture standard out and process the text. Capturing such output allows you to verify that the processing of log events is being generated as expected. In our test for advanced config, we apply this technique. If log events are not going to stdout, but to a log file for Fluentd, it can also be interrogated, looking for how much log information is generated and whether specific log events have occurred.

Listing 9.8 Chapter9/fluent-plugin-out-redislist/test/plugin/test_out_redislist.rb

test 'advanced config' do                                                   
    conf = %[                                                               
    host 127.0.0.1
    port 24229
    ]
 
    captured_string = capture_stdout do                                     
      d = create_driver(conf)                                               
      assert_equal 24229, d.instance.port                                   
      assert_equal '127.0.0.1', d.instance.hostaddr
    end
 
    assert_true (captured_string.include? "Non standard Redis port in use") 
 
    d.shutdown                                                              
  end

Declares the unit test

Creates a set of configuration values to be passed

Defines a variable to capture any stdout generated within the following statement block

Creates the driver using the test config provided

Tests the values set in the plugin for the port

Evaluates whether the warning about no standard port has been produced

Shuts down everything cleanly

To execute the unit tests, we simply need to follow the unit test framework’s guidance. In the case of test-unit, that comes down to using Ruby to execute the unit test file. For example:

ruby Chapter9/fluent-plugin-out-redislist/test/plugin/test_out_redislist.rb

9.10 Putting the development of unit tests into action

Previously, we identified the need to test different configurations through running Fluentd with alternative configuration files. These should be replaced with unit tests. As the output plugin has restricted how we can use buffering, we need to further test configuration handling. This is best done with unit tests for the configure and write functions.

9.10.1 Answer

Within the Chapter9/ExerciseResults directory, we have included two child directories called test-out and test-in. These contain the directory structures and files with additional unit tests covering the configure and other operations that you can compare against.

9.11 Package and deployment

Having completed testing, we can think about packaging and deploying our plugins. This includes preparing the metadata files and documentation.

9.11.1 Documentation

Part of packaging up a solution includes providing the licensing information and documentation for the plugin. The template utility will have provided a standard license document and a basic README. The most important thing here is to ensure that the readme is clear and complete. Like any good product, the plugin’s ability to be successfully used is predicated on people understanding how to use it, so good documentation will make a meaningful difference. In the download pack (and GitHub repository), we have provided a separate completed readme, so the initial generated state (readme.md) and final state (readme-final.md) can be compared. You may notice in the readme content some instructions for helping to complete the Gemfile, which we will come to in a moment.

The task of generating the documentation for the plugins is greatly simplified using another Fluentd-provided utility. The fluent-plugin-config-format utility takes the plugin type (in the same way that the utility that created the skeleton for us) and names the plugin’s parameters. We can then tell the utility how we would like the documentation to be generated. In the following example, we have used markdown (it makes it easy for GitHub and other Git-like repositories to render nicely, but other options include pure text and JSON). As we want to generate the documentation from the source, we need to provide the path to the Ruby plugin code. Using this utility with the following parameters will produce the documentation about the plugin configuration information:

fluent-plugin-config-format output redislist -f markdown -p lib/fluent/plugin/

The result of the utility is sent to the console rather than to a file, so we do need to cut and paste the output into our readme.md file (or use some shell/console tricks to pipe the output into the file). We have incorporated the output into the readme - final.md file.

USING RDOC OR YARD

In the directory structure shown in figure 9.2, we had a doc folder that was not generated by the Fluentd utility. This comes from running RDoc or YARD on the code, resulting in developer-level documentation being generated. In this case, we have opted to use YARD, as it provides some additional neat features over standard RDoc. To find out more about YARD and install it, see appendix E. Note that if you prefer to stick with RDoc, the metadata tags YARD uses will appear in the generated output.

With YARD installed, maintaining this document comes down to looking after the code, commenting, and running the commands from the root folder of the plugin:

yard doc lib/fluent/plugin/out_redislist.rb
yard doc lib/fluent/plugin/in_redislist.rb

This will update the documentation for you, but it focuses on general Ruby code and comments, not the specifics relating to Fluentd, such as the configuration values and their parameters.

9.11.2 Complete metadata aka manifest

The Gemfile needs to be updated as directed by the README in its defaulted content. Therefore, we need to add to the gemspec file the name of the gem we want to create. It takes the form of gem <name of the gem>—for example, gem "fluent-plugin-out-redislist"—for our input plugin.

The gemspec file will also need to be completed with additional information, including the summary, description, home page, license, contact details, and versioning (Semantic Versioning format is expected; https://semver.org). This also needs to include details of any dependencies. When set up, we installed several additional gems, such as the Redis connector. This means we need to add the dependencies into the gemspec file so they are retrieved when this plugin is installed:

spec.add_runtime_dependency "redis", "~>4.0"

This indicates that the Redis gem is needed at a version of 4.0 or later. The gemspec standard does allow for complex rules to be defined for what versions can be used.

9.11.3 Building the gem package

Once complete, we can do this using the RubyGem tooling that we previously installed. This means we can use the gemspec tools to create the final package. But be careful where you execute the command from, as the gemspec file includes scripting, which uses relative paths to locate all the files that need to be included. We can switch on the verbose mode using the -V parameter to the command to make it easy to see what is going on. To complete the task, we use the command to create the gem file:

gem build in-redislist.gem --config-file ./fluent-plugin-redistlist.gemspec -V

With the gem file is created, we need to install it into our local library of gems. Once complete, we can then use the gem without providing the path to the actual Ruby code. This can be done with the command

gem install ./fluent-plugin-out-redislist-0.1.0.gem

We can confirm that the gem is in place by executing the command

gem search -l redis

This should yield a list including our gem.

9.11.4 Rerun without the plugin paths

With the plugins built and the gems created and installed, we can rerun our test scenario. This time the test can be run without referencing the Ruby code directly, as we have made the code available via the gems we created to ensure we don’t accidentally run using the -p parameter in the Fluentd command line. As a result, we can see our plugin working, but the execution commands reflect the conventional way of working that we would see in production.

9.12 Extending to be an enterprise-class solution

The plugin development has been successful and has met all our requirements. But it doesn’t yet reach the level at which we could call the solution suitable for enterprise-class use cases. To bring things up to a suitable standard, we recommend some changes:

  • Credentials used on the Redis connection—configure the plugin to optionally use an authenticated connection with Redis. This should be done to allow the credentials to be injected from a secure source, such as Vault.

  • Using an authenticated connection to Redis means passing credentials. We do not really want to be doing that with an HTTP connection with the credentials in cleartext. So, the connection to Redis should be implemented securely with SSL/TLS certificates.

  • One of the features within Redis, and what makes it such a good cache solution, is the inclusion of TTL. By incorporating TTL, we can control the size of the Redis buffer, and if we cannot keep up, then events will simply expire.

  • As events are handled, some basic stats could be generated that could potentially be consumed by Prometheus.

  • Extend and enhance the unit testing to achieve a target of greater than 60% coverage.

  • Develop the prebuilt Rakefile to include and execute all the unit tests. Also, incorporate activities such as

    • Lint code analysis.
    • Maintaining the additional documentation using of RDoc or YARD. Currently, the doc folder is generated using YARD.
    • Generating fluent-plugin-config-format output and determining if there have been any changes since the last build.
  • Incorporate the process into a continuous integration tool suite.

  • Consolidate the readme.md documentation rather than using the readme - final.md

If you wish to develop your plugin development skills or use this as a practical opportunity to develop your Ruby development skills, we would encourage you to try implementing these suggested improvements. As you won’t be following our steps when implementing these features, we would recommend that you adopt your preferred development tool(s). We do not have a reference solution to this, but running the test scenarios as you implement your solution to this exercise will confirm a successful outcome functionally.

Summary

  • Fluentd plugin tooling provides the means to create the skeleton code to develop both input and output plugins.

  • Fluentd’s skeleton supports asynchronous and synchronous buffering output plugin functions.

  • Fluentd input plugins need to be able to set the time and tag details. The input plugin implements this logic.

  • The plugin framework includes defining, creating, and defaulting the configuration properties for the plugin loaded from the configuration file.

  • Fluentd plugins are typically made available as RubyGems. The utilities provided by Ruby and Fluentd make this easy to achieve.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.188.95