How to use pipelines and processors

if I want to extract the numeric duration value from
words::words::words::words (duration=432, words)

would this grok pipeline work
PUT _ingest/pipeline/parse
{
"description" : "parses the duration field",
"processors" : [
{
"grok": {
"field": "message",
"patterns": ["%{NUMBER:duration}"]
}
}
]
}

Is there anything else I have to do after creating this pipeline in order to use it?

You can _simulate the pipeline to verify that it works how you intend.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
      "processors" : [
         {
            "grok": {
               "field": "message",
               "patterns": ["%{NUMBER:duration}"]
            }
         }
      ]
   },
  "docs": [
       {"_id": "asdf", "_source": {"message": "words::words::words::words (duration=432, words)"}}
  ]
}

Results in

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "asdf",
        "_source" : {
          "duration" : "432",
          "message" : "words::words::words::words (duration=432, words)"
        },
        "_ingest" : {
          "timestamp" : "2019-05-31T13:04:20.599Z"
        }
      }
    }
  ]
}

As for using a pipeline, you can use one at ingest time by specifying the pipeline url parameter when POSTing docs, e.g. POST my_index/_doc/?pipeline=my_pipeline

Recently in 6.5 a new index setting was added: default_pipeline. That allows a pipeline to be ran on all docs without you having to specify it.

1 Like

Great that was a lot of help. My pipeline works as expected. Now my logs come in automatically through fluentd I believe and I have version 6.3 of Kibana unfortunately. Is their a way to apply this pipeline to documents coming in or is their something I can do to get this working on its own?

@nicks1993 you need to change whatever is POST'ing the docs to the ES cluster to use the pipeline.

1 Like

Running that gives me this error

{
"error": {
"root_cause": [
{
"type": "parse_exception",
"reason": "request body is required"
}
],
"type": "parse_exception",
"reason": "request body is required"
},
"status": 400
}

@nicks1993 whatever you are using to POST the documents to the index, that needs to be changed to use the pipeline.

e.g. logstash:

output {
  elasticsearch {
    /// es settings
    pipeline => "rename_hostname" <--- your pipeline
  }
}

posting docs manually

POST my_index/_doc/?pipeline=my_pipeline
{
// doc body
}

This blog covers some use cases: https://www.elastic.co/blog/new-way-to-ingest-part-1

1 Like

Okay cool, it looks like I'll have to add that to fluentd because I believe we are using EFK here. Thanks I'll try and pass the information along to someone who can give me access to fluentd.

Thanks for all the help, hopefully this will solve my problem.
Now do I need to create another field named duration? because I did that already, will that effect whether it gets stored in that field?
And once it is stored I can do normal aggregations on it like any other field right?

You should not have to create another field named duration. You can see from the _simulate call that the grok processor added the field. If you want to see its behavior given an existing duration field, I suggest running that simulation.

Yep, it is just another field.

1 Like

Sweet man thanks. I'll have to run it by management now to get it running.
If we decide to upgrade to version 6.5 what would the process look like, I would just set the default pipeline from dev tools on kibana?

@nicks1993 since it is an index setting, you will need to update the settings.

PUT /my_index/_settings
{
    "index" : {
        "default_pipeline" : "my_pipeline"
    }
}

index.default_pipeline
The default ingest node pipeline for this index. Index requests will fail if the default pipeline is set and the pipeline does not exist. The default may be overridden using the pipeline parameter. The special pipeline name _none indicates no ingest pipeline should be run.

1 Like

Hmm that's pretty cool, just tried it and confirmed it does not work on my version of elastic.
There is no way to apply a pipeline to an index for older versions?

@nicks1993 in older versions (pre 6.5.0), the ingest pipeline must be supplied when the document is sent to ES. Either in putting it in the URL, or updating whatever automatic systems is pushing the logs to ES (e.g. Fluentd, logstash, beats, etc.)

1 Like

Perfect, thanks again

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.