Simulate an ingest pipeline

The ingest part of every architecture is very sensitive, so the Elasticsearch team has created the possibility of simulating your pipelines without the need to store them in Elasticsearch.

The simulate pipeline API allows a user to test/improve and check functionalities of your pipeline without deployment in the Elasticsearch cluster.

Getting ready

You need an up-and-running Elasticsearch installation, as we described in the Downloading and installing Elasticsearch recipe in Chapter 2, Downloading and Setup.

To execute curl via the command-line, you need to install curl for your operative system.

How to do it...

To simulate an ingestion pipeline in Elasticsearch, we will perform the following steps:

  1. We can need to execute a call passing both the pipeline and a sample subset of a document to test the pipeline against:
            curl -XPOST 'http://127.0.0.1:9200/_ingest/pipeline/_simulate' 
            -d '{
              "pipeline": {
                "description": "Add user john field",
                "processors": [
                  {
                    "set": {
                      "field": "user",
                      "value": "john"
                    }
                  },
                  {
                    "set": {
                      "field": "job",
                      "value": 10
                    }
                  }
                ],
                "version": 1
              },
              "docs": [
                {
                  "_index": "index",
                  "_type": "type",
                  "_id": "1",
                  "_source": {
                    "name": "docs1"
                  }
                },
                {
                  "_index": "index",
                  "_type": "type",
                  "_id": "2",
                  "_source": {
                    "name": "docs2"
                  }
                }
              ]
            }'
    
  2. The result returned by Elasticsearch, if everything okay, should be a list of documents with the pipeline processed:
            {
              "docs" : [
                {
                  "doc" : {
                    "_index" : "index",
                    "_id" : "1",
                    "_type" : "type",
                    "_source" : {
                      "name" : "docs1",
                      "job" : 10,
                      "user" : "john"
                    },
                    "_ingest" : {
                      "timestamp" : "2016-12-10T13:33:24.375+0000"
                    }
                  }
                },
                {
                  "doc" : {
                    "_index" : "index",
                    "_id" : "2",
                    "_type" : "type",
                    "_source" : {
                      "name" : "docs2",
                      "job" : 10,
                      "user" : "john"
                    },
                    "_ingest" : {
                      "timestamp" : "2016-12-10T13:33:24.375+0000"
                    }
                  }
                }
              ]
            }
    

How it works...

In a single call, the simulated pipeline API is able to test a pipeline on a subset of documents. It internally executes the following steps:

  1. It parses the provided pipeline definition, creating an in-memory representation of the pipeline.
  2. It reads the provided documents applying the pipeline.
  3. It returns the processed results.

The only required sections are pipeline one and docs containing a list of documents. The documents (provided in docs) must be formatted with metadata fields and the source field, similar to a query result.

There are processors that are able to modify the metadata fields; for example, they are able to change _index or _type based on some contents. The metadata fields are _index, _type, _id, _routing, and _parent.

For debugging purposes, it is possible to add the URL query argument verbosely to return all the intermediate steps of the pipeline. For example, if we change the call of the previous simulation in:

    curl -XPOST 'http://127.0.0.1:9200/_ingest/pipeline/_simulate?
    verbose' -d '...truncated...'

The result will be expanded for every pipeline step:

{ 
  "docs" : [ 
    { 
      "processor_results" : [ 
        { 
          "doc" : { 
            "_index" : "index", 
            "_id" : "1", 
            "_type" : "type", 
            "_source" : { 
              "name" : "docs1", 
              "user" : "john" 
            }, 
            "_ingest" : { 
              "timestamp" : "2016-12-10T13:53:29.771+0000" 
            } 
          } 
        }, 
        { 
          "doc" : { 
            "_index" : "index", 
            "_id" : "1", 
            "_type" : "type", 
            "_source" : { 
              "name" : "docs1", 
              "job" : 10, 
              "user" : "john" 
            }, 
            "_ingest" : { 
              "timestamp" : "2016-12-10T13:53:29.771+0000" 
            } 
          } 
        } 
      ] 
    }, ...truncated... 

There's more...

The simulate pipeline API is very handy when a user needs to check a complex pipeline that uses special fields access, such as:

  • Ingest metadata fields: These are special metadata fields, such as _ingest.timestamp, that are available during ingestion. This kind of field provides values to be added in the document; for example:
        { 
          "set": { 
            "field": "received" 
            "value": "{{_ingest.timestamp}}" 
          } 
        } 
  • Field replace templating: Using the templating with {{}}, it's possible to inject other fields or join their values:
        { 
          "set": { 
            "field": "full_name" 
            "value": "{{name}} {{surname}}" 
          } 
        } 

The ingest metadata fields (accessible via _ingest) are as follows:

  • timestamp: This contains the current pipeline timestamp.
  • on_failure_message: This is available only in the on_failure block in case of failure. It contains the failure message.
  • on_failure_processor_type: This is available only in the on_failure block in case of failure. It contains the failure processor type that has generated the failure.
  • on_failure_processor_tag: This is available only in the on_failure block in case of failure. It contains the failure tag that has generated the failure.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.185.87