One of the awesome things about Logstash is that there are so many ways to get log events into it, manipulate and filter events once they are in and then push them out to a whole variety of destinations. Indeed, at the time of writing, there were nearly 100 separate input, filter and output plugins. Every now and again though you encounter a scenario where you need a new plugin or want to customize a plugin to better suit your environment.
Now our project has almost reached its conclusion we've decided we better learn how to extend Logstash ourselves to cater for some of the scenarios when you need to modify or create a plugin.
Since Logstash 1.5.0 plugins have been shipped as Ruby Gems. As we've seen earlier in the book a lot of plugins are shipped with the Logstash package. Others are available from the Logstash plugins GitHub account. You can use the logstash-plugin
binary to install these.
To construct our own plugins we can use some simple scaffold code that the Logstash team provides for each plugin type:
We can copy these sample plugins and create a new plugin of our own from the template. Let's quickly look at the plugin template's structure.
$ tree logstash-input-pluginname
├── Gemfile
├── LICENSE
├── README.md
├── Rakefile
├── lib
│ └── logstash
│ └── inputs
│ └── pluginname.rb
├── logstash-input-pluginname.gemspec
└── spec
└── inputs
└── pluginname_spec.rb
We can see that it looks like a pretty typical Ruby Gem. The core of our plugin is contained in the lib/logstash
directory, inside a directory named for the type of plugin being developed: input
, filter
, or output
. We also have a spec
directory to hold any tests for the plugin. Rounding out our template are a README
, license and a Rakefile
to help us automate our plugin's build.
We've also got a .gemspec
or Gem specification file that helps us build our plugin.
Gem::Specification.new do |s|
s.name = 'logstash-input-example'
s.version = '0.1.2'
s.licenses = ['Apache License (2.0)']
s.summary = "This example input streams a string at a definable interval."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
s.authors = ["Elastic"]
s.email = '[email protected]'
s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html"
s.require_paths = ["lib"]
# Files
s.files = `git ls-files`.split($)
# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})
# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" }
# Gem dependencies
s.add_runtime_dependency 'logstash-core', '>= 1.4.0', '< 2.0.0'
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud'
s.add_development_dependency 'logstash-devutils'
end
XXX
This file, and the s.metadata
line specifically, configures the Gem as a Logstash plugin. Also important is the s.version
field which tells Logstash the version of the plugin. This replaces the previously-used milestone
method in older plugins.
The varying versions you specify produce logging output warnings or status that tell people about the maturity of your plugin. Versions produce the following results:
You would also specify any Gem or library dependencies in the Gemspec.
logstash-core
gem and a development dependency on the logstash-devutils
gem.
Let's look at one of the more basic plugins, the stdin
input, and see what we can learn about plugin anatomy. You can see the full plugin code here but let's look at some key pieces.
A plugin starts with a series of require
statements that include any supporting code.
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "concurrent/atomics"
require "socket" # for Socket.gethostname
Firstly, each plugin requires the Logstash base class for the type of plugin, here logstash/inputs/base
. We also require the base Logstash class, logstash/namespace
.
We also include any prerequisites, in this case the stdin
input requires the Socket library for the gethostname
method and the concurrent
gem to provide some concurrency helpers. In the case of the concurrent
gem we'd also add that as a dependency in our Gem specification file to ensure it is added when the plugin is built.
s.add_runtime_dependency 'concurrent-ruby'
We then create a class and inherit the LogStash::Inputs::Base
class we required above. For filters we would require the LogStash::Filters::Base
class and outputs the LogStash::Outputs::Base
class respectively.
class LogStash::Inputs::Stdin < LogStash::Inputs::Base
config_name "stdin"
default :codec, "line"
def initialize(*args)
super(*args)
@stop_requested = Concurrent::AtomicBoolean.new(false)
end
def register
@host = Socket.gethostname
fix_streaming_codecs
end
. . .
Each plugin also requires a name provided by the config_name
method. The config_name
provides Logstash with the name of the plugin.
We also specify the default codec the plugin uses, here line
, using default :codec
. The line
codec decodes events that are lines of text data.
Every plugin also has the register
method inside which you should specify anything needed to initialize the plugin, for example our stdin
input sets the @host
host name instance variable.
Each type of plugin then has a method that contains its core execution:
run
method, which is expected to run forever.filter
method.receive
method.Let's look our stdin
's run
method.
def run(queue)
while @stop_requested.false?
begin
# Based on some testing, there is no way to interrupt an IO.sysread nor
# IO.select call in JRuby. Bummer :(
data = $stdin.sysread(16384)
@codec.decode(data) do |event|
decorate(event)
event["host"] = @host if !event.include?("host")
queue << event
end
rescue IOError, EOFError, LogStash::ShutdownSignal
# stdin closed or a requested shutdown
@stop_requested.make_true
break
rescue => e
# ignore any exception in the shutdown process
break if @stop_requested.true?
raise(e)
end
end
finished
end
So what happens in our stdin
input? After the register
method initializes the plugin then the run
method is called. The run
method takes a parameter which is the queue of incoming data. In the case of the stdin
input the loop inside this method is initiated.
The input then runs until stopped, processing any incoming data from STDIN
, decoding the incoming data using the default codec specified and turning it into events.
The decorate
method then applies any metadata we've set in our configuration, for example if we've set a tag on the event.
The decoded and decorated event is then injected back into the queue to be passed to Logstash for any further processing.
One last method is defined in our stdin
input, teardown
. When this method is specified then Logstash will execute it when the plugin is being shutdown. It's useful for cleaning up, in this case closing the pipe.
def teardown
@stop_requested.make_true
@logger.debug("stdin shutting down.")
$stdin.close rescue nil
finished
end
Now we've got a broad understanding of how a plugin works let's now create one of our own. We're going to start with a simple plugin to read lines from a named pipe: a very simple pipe-based file
input.
First, let's create a directory to hold our plugin.
$ mkdir -p /src/logstash-input-namedpipe
Let's make it a Git repository.
$ cd /src/logstash-input-namedpipe
$ git init
Let's populate this directory with the input plugin template.
$ cd /tmp
$ git clone https://github.com/logstash-plugins/logstash-input-example.git
$ cd logstash-input-example
$ rm -rf .git
$ cp -R * /src/logstash-input-namedpipe/
Here we've changed into the /tmp
directory, used Git to clone the example input plugin template and then copied that template into our plugin directory.
We're now going to delete the example plugin file, rename the RSpec test file and rename our Gem specification.
$ cd /src/logstash-input-namedpipe
$ rm lib/logstash/input/example.rb
$ mv spec/inputs/example_spec.rb spec/inputs/namedpipe_spec.rb
$ mv logstash-input-example.gemspec logstash-input-namedpipe.gemspec
We'd also edit our Gem specification to update it to the correct name, version and update any required dependencies.
Next let's edit our input itself. First we created a file to hold our plugin.
$ cd logstash-input-namedpipe
$ touch lib/logstash/input/namedpipe.rb
Now let's populate our file, starting with adding our require
statements and creating our base class.
require 'logstash/namespace'
require 'logstash/inputs/base'
class LogStash::Inputs::NamedPipe < LogStash::Inputs::Base
. . .
end
We've added requires for an input and a class called LogStash::Inputs::NamedPipe
.
Now let's add in our plugin's name and status using the config_name
method. We're also going to specify the default codec, or format, this plugin will expect events to arrive in. We're going to specify the line
codec as we expect our events to be text strings.
require 'logstash/namespace'
require 'logstash/inputs/base'
class LogStash::Inputs::NamedPipe < LogStash::Inputs::Base
config_name "namedpipe"
default :codec, "line"
# The pipe to read from
config :pipe, :validate => :string, :required => true
. . .
end
You can see we've also added a configuration option, using the config
method. This method allows us to specify the configuration options and settings of our plugins, for example if we were configuring this input we could now use an option called pipe
:
input {
namedpipe {
pipe => "/tmp/ournamedpipe"
type => "pipe"
}
}
Configuration options have a variety of properties: you can validate the content of an option, for example we're validating that the pipe
option is a string
. You can add a default for an option, for example :default => "default option"
, or indicate that the option is required. If an option is required and that option is not provided then Logstash will not start.
Now let's add the guts of the namedpipe
input.
require 'logstash/namespace'
require 'logstash/inputs/base'
class LogStash::Inputs::NamedPipe < LogStash::Inputs::Base
config_name "namedpipe"
default :codec, "line"
config :pipe, :validate => :string, :required => true
public
def register
@logger.info("Registering namedpipe input", :pipe => @pipe)
end
def run(queue)
@pipe = open(pipe, "r+")
@pipe.each do |line|
line = line.chomp
host = Socket.gethostname
path = pipe
@logger.debug("Received line", :pipe => pipe, :line => line)
@codec.decode(line) do |event|
decorate(event)
event["host"] = host
event["path"] = path
queue << event
end
end
end
def teardown
@pipe.close
finished
end
end
We've added three new methods: register
, run
, and teardown
.
The register
method sends a log notification using the @logger
instance variable. Adding a log level method, in this case info
sends an information log message. We could also use debug
to send a debug-level message.
The run
method is our queue of log events. It opens a named pipe, identified using our pipe
configuration option. Our code constructs a source for our log event, that'll eventually populate the host
and path
fields in our event. We then generate a debug-level event and use the to_event
method to take the content from our named pipe, add our host and path and pass it to Logstash as an event. The run
method will keep sending events until the input is stopped.
When the input is stopped the teardown
method will be run. This method closes the named pipe and tells Logstash that the input is finished.
To build a Logstash plugin we treat it exactly like a Ruby gem. We can build our plugin based on the Gemspec.
First we'd install any dependencies with Bundler.
$ cd logstash-input-namedpipe
$ bundle install
Once this is done we can build our actual plugin gem.
$ cd logstash-input-namedpipe
$ gem build logstash-input-namedpipe.gemspec
This will create a new gem in the logstash-input-namedpipe
directory.
Now let's add our new plugin to Logstash and see it in action.
Adding new plugins to Logstash is done using the logstash-plugin
binary. You just need a copy of the Gem file you built of your plugin.
$ bin/logstash-plugin install /path/to/gemfile/logstash-input-namedpipe-0.1.0.gem
You should now be able to see the installed plugin in the list of plugins on that Logstash server.
$ bin/logstash-plugin list
. . .
logstash-input-namedpipe
. . .
Now we've written our first input let's look at another kind of plugin: a filter. As we've discovered filters are designed to manipulate events in some way. We've seen a variety of filters in Chapter 5 but we're going to write one of our own now. In this filter we're going to add a suffix to all message
fields. Let's start by adding the code for our filter:
require "logstash/filters/base"
require "logstash/namespace"
class LogStash::Filters::AddSuffix < LogStash::Filters::Base
config_name "addsuffix"
config :suffix, :validate => :string
public
def register
end
public
def filter(event)
if @suffix
msg = event["message"] + " " + @suffix
event["message"] = msg
end
filter_matched(event)
end
end
Let's examine what's happening in our filter. Firstly, we've required the prerequisite classes and defined a class for our filter: LogStash::Filters::AddSuffix
. We've also named and set the status of our filter, the experimental addsuffix
filter, using the config_name
method.
We've also specified a configuration option using the config
method which will contain the suffix which we will be adding to the event's message
field.
Next, we've specified an empty register
method as we're not performing any registration or plugin setup. The most important method, the filter
method itself, takes the event as a parameter. In our case it checks for the presence of the @suffix
instance variable that contains our configured suffix. If no suffix is configured the filter is skipped. If the suffix is present it is applied to the end of our message and the message returned.
The filter_matched(event)
method call at the end of our filter ensures any tags or other metadata specified in our configuration are applied to the event.
event.cancel
method.
Now we can configure our new filter, like so:
filter {
addsuffix {
suffix => "ALERT"
}
}
If we now run Logstash we'll see that all incoming events now have a suffix added to the message
field of ALERT
resulting in events like so:
{
"host" => "smoker.example.com",
"@timestamp" => "2013-01-21T18:43:34.531Z",
"message" => "testing ALERT",
"type" => "human"
}
You can now see how easy it is to manipulate events and their contents.
Our final task is to learn how to write the last type of plugin: an output. For our last plugin we're going to be a little flippant and create an output that generates CowSay events. First, we need to install a CowSay package, for example on Debian-distributions:
$ sudo apt-get install cowsay
Or via a RubyGem:
$ sudo gem install cowsay
This will provide a cowsay
binary our output is going to use.
Now let's look at our CowSay output's code:
require "logstash/outputs/base"
require "logstash/namespace"
class LogStash::Outputs::CowSay < LogStash::Outputs::Base
config_name "cowsay"
config :cowsay_log, :validate => :string, :default => "/var/log/cowsay.log"
public
def register
end
public
def receive(event)
msg = `cowsay #{event["message"]}`
File.open(@cowsay_log, 'a+') { |file| file.write("#{msg}") }
end
end
Our output requires the prerequisite classes and creates a class called LogStash::Outputs::CowSay
. We've specified the name of the output, cowsay
with config_name
method. We've specified a single configuration option using the config
method. The option, cowsay_log
specifies a default log file location, /var/log/cowsay.log
, for our log output.
Next we've specified an empty register
method as we don't have anything we'd like to register.
The guts of our output is in the receive
method which takes an event
as a parameter. In this method we've shell'ed out to the cowsay
binary and parsed the event["message"]
(the contents of the message
field) with CowSay. It then writes this "cow said" message to our /var/log/cowsay.log
file.
We can now configure our cowsay
output:
output {
cowsay {}
}
You'll note we don't specify any options and use the default destination. If we now run Logstash we can generate some CowSay statements like so:
You can see we have an animal message. It's easy to see how you can extend an output to send events or portions of events to a variety of destinations.
This has been a very simple introduction to writing Logstash plugins. It gives you the basics of each plugin type and how to use them. You can build on these examples easily enough and solve your own problems with plugins you've developed yourself.
3.15.143.181