The ingest part of every architecture is very sensitive, so the Elasticsearch team has created the possibility of simulating your pipelines without the need to store them in Elasticsearch.
The simulate pipeline API allows a user to test/improve and check functionalities of your pipeline without deployment in the Elasticsearch cluster.
You need an up-and-running Elasticsearch installation, as we described in the Downloading and installing Elasticsearch recipe in Chapter 2, Downloading and Setup.
To execute curl
via the command-line, you need to install curl
for your operative system.
To simulate an ingestion pipeline in Elasticsearch, we will perform the following steps:
curl -XPOST 'http://127.0.0.1:9200/_ingest/pipeline/_simulate' -d '{ "pipeline": { "description": "Add user john field", "processors": [ { "set": { "field": "user", "value": "john" } }, { "set": { "field": "job", "value": 10 } } ], "version": 1 }, "docs": [ { "_index": "index", "_type": "type", "_id": "1", "_source": { "name": "docs1" } }, { "_index": "index", "_type": "type", "_id": "2", "_source": { "name": "docs2" } } ] }'
{ "docs" : [ { "doc" : { "_index" : "index", "_id" : "1", "_type" : "type", "_source" : { "name" : "docs1", "job" : 10, "user" : "john" }, "_ingest" : { "timestamp" : "2016-12-10T13:33:24.375+0000" } } }, { "doc" : { "_index" : "index", "_id" : "2", "_type" : "type", "_source" : { "name" : "docs2", "job" : 10, "user" : "john" }, "_ingest" : { "timestamp" : "2016-12-10T13:33:24.375+0000" } } } ] }
In a single call, the simulated pipeline API is able to test a pipeline on a subset of documents. It internally executes the following steps:
The only required sections are pipeline one and docs containing a list of documents. The documents (provided in docs) must be formatted with metadata fields and the source field, similar to a query result.
There are processors that are able to modify the metadata fields; for example, they are able to change _index
or _type
based on some contents. The metadata fields are _index
, _type
, _id
, _routing
, and _parent
.
For debugging purposes, it is possible to add the URL query argument verbosely to return all the intermediate steps of the pipeline. For example, if we change the call of the previous simulation in:
curl -XPOST 'http://127.0.0.1:9200/_ingest/pipeline/_simulate? verbose' -d '...truncated...'
The result will be expanded for every pipeline step:
{ "docs" : [ { "processor_results" : [ { "doc" : { "_index" : "index", "_id" : "1", "_type" : "type", "_source" : { "name" : "docs1", "user" : "john" }, "_ingest" : { "timestamp" : "2016-12-10T13:53:29.771+0000" } } }, { "doc" : { "_index" : "index", "_id" : "1", "_type" : "type", "_source" : { "name" : "docs1", "job" : 10, "user" : "john" }, "_ingest" : { "timestamp" : "2016-12-10T13:53:29.771+0000" } } } ] }, ...truncated...
The simulate pipeline API is very handy when a user needs to check a complex pipeline that uses special fields access, such as:
_ingest.timestamp
, that are available during ingestion. This kind of field provides values to be added in the document; for example:{ "set": { "field": "received" "value": "{{_ingest.timestamp}}" } }
{{}}
, it's possible to inject other fields or join their values:{ "set": { "field": "full_name" "value": "{{name}} {{surname}}" } }
The ingest metadata fields (accessible via _ingest
) are as follows:
timestamp
: This contains the current pipeline timestamp.on_failure_message
: This is available only in the on_failure
block in case of failure. It contains the failure message.on_failure_processor_type
: This is available only in the on_failure
block in case of failure. It contains the failure processor type that has generated the failure.on_failure_processor_tag
: This is available only in the on_failure
block in case of failure. It contains the failure tag that has generated the failure.18.226.185.87