Using the RabbitMQ river

RabbitMQ is a fast message broker, which can handle thousands of messages in a second. It can be very handy to be used in conjunction with ElasticSearch to bulk index records.

The RabbitMQ river plugin is designed to wait for messages that store bulk operations and index them.

Getting ready

You need a working ElasticSearch cluster and a working RabbitMQ instance installed in the same machine of ElasticSearch.

How to do it...

For using the RabbitMQ river, we need to perform the following steps:

  1. Firstly, we need to install the RabbitMQ river plugin, which is available on GitHub (https://github.com/elasticsearch/elasticsearch-river-rabbitmq). We can install the river plugin in the following way:
    bin/plugin -install elasticsearch/elasticsearch-river-rabbitmq/1.6.0
  2. The result should be as follows:
    -> Installing elasticsearch/elasticsearch-river-rabbitmq/1.6.0...
    Trying http://download.elasticsearch.org/elasticsearch/elasticsearch-river-rabbitmq/elasticsearch-river-rabbitmq-1.6.0.zip...
    Downloading ....................DONE
    Installed river-rabbitmq
  3. Restart your ElasticSearch node to ensure that the river plugin is properly loaded. In the log you should see the following result:
    [2013-08-12 23:08:43,639][INFO ][plugins                  ] [Fault Zone] loaded [river-rabbitmq, river-twitter, transport-thrift, river-mongodb, mapper-attachments, lang-python, river-couchdb, lang-javascript], sites [bigdesk, head]
  4. We need to create a config(.json) file, to configure the river, as follows:
    {
      "type" : "rabbitmq",
      "rabbitmq" : {
            "host" : "localhost",
            "port" : 5672,
            "user" : "guest",
            "pass" : "guest",
            "vhost" : "/",
            "queue" : "elasticsearch",
            "exchange" : "elasticsearch",
            "routing_key" : "elasticsearch",
            "exchange_declare" : true,
            "exchange_type" : "direct",
            "exchange_durable" : true,
            "queue_declare" : true,
            "queue_bind" : true,
            "queue_durable" : true,
            "queue_auto_delete" : false,
            "heartbeat" : "30m"
        },
        "index" : {
            "bulk_size" : 100,
            "bulk_timeout" : "10ms",
            "ordered" : false
        }
    }
  5. Now we can create the river with the current configuration as follows:
    curl -XPUT 'http://127.0.0.1:9200/_river/rabbitriver/_meta' -d @config.json
  6. The result will be as follows:
    {"ok":true,"_index":"_river","_type":" rabbitriver ","_id":"_meta","_version":1}

How it works...

The RabbitMQ river instantiates a connection to the RabbitMQ server and waits for messages to process. The only kind of messages, that the plugin is able to process, are bulk operation.

Tip

Every bulk operation must terminate with a new line character otherwise the last operation will be shallowed.

Typically, the connection is a direct one, that means that as fast as the message is sent to the server, it is redirected to the client.

The river type is rabbitmq and all client configurations live on the rabbitmq object. The most common parameters are as follows:

  • host (default localhost): This defines the RabbitMQ server address.
  • port (default 5672): This defines the RabbitMQ server port.
  • user and pass: This defines the the username and password credentials required to access the RabbitMQ server.
  • vhost (default /): This defines the RabbitMQ host to be used.
  • exchange_declare (false/true) and exchange (default elasticsearch): These control if an exchange must be bound and the exchange name.
  • exchange_type (default direct): This defines the type of exchange to be used.
  • exchange_durable (default true): This defines a durable exchange that survives the RabbitMQ broker restart, otherwise it is transient.
  • queue_declare (false/true) and queue (default elasticsearch): These control if a queue must be bound and the queue name.
  • queue_durable (default true): This defines a durable queue that survives the RabbitMQ broker restart, otherwise it is transient.
  • queue_auto_delete (default false): This defines that a queue, where consumers finish all the messages, is automatically deleted.
  • heartbeat: This sends the hearbeat delay to the connection. It's used to prevent connection dropping if there are network inactivities.

Sometimes the RabbitMQ server is configured in the cluster mode for higher availability. In this configuration there is no a single host, but a list of hosts. They can be defined as a list of addresses in the following way:

{
    "rabbitmq" : {
        "addresses" : [
            {
                "host" : "host1",
                "port" : 5672
            },
            {
                "host" : "host2",
                "port" : 5672
            }
        ]
    }
    ...
}

There's more…

The RabbitMQ river plugin allows to control with scripting two important aspects of bulk processing: the global bulk with the bulk_scripting_filter and every single document with script_filter that must be indexed or created.

The definition of these two script filters is given by the following standards:

  • script: This is the code of the script
  • script_lang: This is the language to be used to interpret the code
  • script_params: This is a dictionary/map/key-value containing the additional parameter to be passed to the script.

The bulk_script_filter will receive a block of text (body) that is the text consisting of a list of actions. The script must return another block of text to be processed by ElasticSearch. If the script returns a null, the bulk is skipped.

An example of the bulk_script_filter declaration is as follows:

{
    "type" : "rabbitmq",
    "rabbitmq" : {
        ...
    },
    "index" : {
        ...
    },
    "bulk_script_filter" : {
        "script" : "myscript",
        "script_lang" : "native",
        "script_params" : {
            "param1" : "val1",
            "param2" : "val2"
            ...
        }
    }
}

If a script_filter is defined, a ctx context is passed to the script for every document, which must be indexed or created.

An example of the script_filter declaration is as follows:

{
    "type" : "rabbitmq",
    "rabbitmq" : {
        ...
    },
    "index" : {
        ...
    },
    "script_filter" : {
        "script" : "ctx.type1.field1 += param1",
        "script_lang" : "mvel",
        "script_params" : {
          "param1" : 1
        }
    }
}

Using the RabbitMQ broker can be a very powerful tool, which allows to support high load and balancing, moving the load peak on to the RabbitMQ message queue. The performance of a message queue as RabbitMQ is far faster than ElasticSearch in processing INSERT because it doesn't require indexing the data. So, it can be a good frontend to resolve ElasticSearch index peaks, and also to allow executing delayed bulk if an ElasticSearch node is down.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.180.124