RabbitMQ is a fast message broker, which can handle thousands of messages in a second. It can be very handy to be used in conjunction with ElasticSearch to bulk index records.
The RabbitMQ river plugin is designed to wait for messages that store bulk operations and index them.
You need a working ElasticSearch cluster and a working RabbitMQ instance installed in the same machine of ElasticSearch.
For using the RabbitMQ river, we need to perform the following steps:
bin/plugin -install elasticsearch/elasticsearch-river-rabbitmq/1.6.0
-> Installing elasticsearch/elasticsearch-river-rabbitmq/1.6.0... Trying http://download.elasticsearch.org/elasticsearch/elasticsearch-river-rabbitmq/elasticsearch-river-rabbitmq-1.6.0.zip... Downloading ....................DONE Installed river-rabbitmq
[2013-08-12 23:08:43,639][INFO ][plugins ] [Fault Zone] loaded [river-rabbitmq, river-twitter, transport-thrift, river-mongodb, mapper-attachments, lang-python, river-couchdb, lang-javascript], sites [bigdesk, head]
config
(.json
) file, to configure the river, as follows:{ "type" : "rabbitmq", "rabbitmq" : { "host" : "localhost", "port" : 5672, "user" : "guest", "pass" : "guest", "vhost" : "/", "queue" : "elasticsearch", "exchange" : "elasticsearch", "routing_key" : "elasticsearch", "exchange_declare" : true, "exchange_type" : "direct", "exchange_durable" : true, "queue_declare" : true, "queue_bind" : true, "queue_durable" : true, "queue_auto_delete" : false, "heartbeat" : "30m" }, "index" : { "bulk_size" : 100, "bulk_timeout" : "10ms", "ordered" : false } }
curl -XPUT 'http://127.0.0.1:9200/_river/rabbitriver/_meta' -d @config.json
{"ok":true,"_index":"_river","_type":" rabbitriver ","_id":"_meta","_version":1}
The RabbitMQ river instantiates a connection to the RabbitMQ server and waits for messages to process. The only kind of messages, that the plugin is able to process, are bulk operation.
Typically, the connection is a direct one, that means that as fast as the message is sent to the server, it is redirected to the client.
The river type is rabbitmq
and all client configurations live on the rabbitmq
object. The most common parameters are as follows:
host
(default localhost
): This defines the RabbitMQ server address.port
(default 5672
): This defines the RabbitMQ server port.user
and pass
: This defines the the username and password credentials required to access the RabbitMQ server.vhost
(default /
): This defines the RabbitMQ host to be used.exchange_declare
(false
/true
) and exchange
(default elasticsearch
): These control if an exchange must be bound and the exchange name.exchange_type
(default direct
): This defines the type of exchange to be used.exchange_durable
(default true
): This defines a durable exchange that survives the RabbitMQ broker restart, otherwise it is transient.queue_declare
(false
/true
) and queue
(default elasticsearch
): These control if a queue must be bound and the queue name.queue_durable
(default true
): This defines a durable queue that survives the RabbitMQ broker restart, otherwise it is transient.queue_auto_delete
(default false
): This defines that a queue, where consumers finish all the messages, is automatically deleted.heartbeat
: This sends the hearbeat delay to the connection. It's used to prevent connection dropping if there are network inactivities.Sometimes the RabbitMQ server is configured in the cluster mode for higher availability. In this configuration there is no a single host, but a list of hosts. They can be defined as a list of addresses in the following way:
{ "rabbitmq" : { "addresses" : [ { "host" : "host1", "port" : 5672 }, { "host" : "host2", "port" : 5672 } ] } ... }
The RabbitMQ river plugin allows to control with scripting two important aspects of bulk processing: the global bulk with the bulk_scripting_filter
and every single document with script_filter
that must be indexed or created.
The definition of these two script filters is given by the following standards:
script
: This is the code of the scriptscript_lang
: This is the language to be used to interpret the codescript_params
: This is a dictionary/map/key-value containing the additional parameter to be passed to the script.The bulk_script_filter
will receive a block of text (body) that is the text consisting of a list of actions. The script must return another block of text to be processed by ElasticSearch. If the script returns a null, the bulk is skipped.
An example of the bulk_script_filter
declaration is as follows:
{ "type" : "rabbitmq", "rabbitmq" : { ... }, "index" : { ... }, "bulk_script_filter" : { "script" : "myscript", "script_lang" : "native", "script_params" : { "param1" : "val1", "param2" : "val2" ... } } }
If a script_filter
is defined, a ctx
context is passed to the script for every document, which must be indexed or created.
An example of the script_filter
declaration is as follows:
{ "type" : "rabbitmq", "rabbitmq" : { ... }, "index" : { ... }, "script_filter" : { "script" : "ctx.type1.field1 += param1", "script_lang" : "mvel", "script_params" : { "param1" : 1 } } }
Using the RabbitMQ broker can be a very powerful tool, which allows to support high load and balancing, moving the load peak on to the RabbitMQ message queue. The performance of a message queue as RabbitMQ is far faster than ElasticSearch in processing INSERT because it doesn't require indexing the data. So, it can be a good frontend to resolve ElasticSearch index peaks, and also to allow executing delayed bulk if an ElasticSearch node is down.
18.191.180.124