Dec 13th, 2018: [EN][Elasticsearch] Chaining Ingest Pipelines

Following up on yesterday's post about adding a timestamp at ingestion, we will be looking at the chaining of pipelines.

When ingesting data you will often be in a situation where you want to apply the same process to parse your data from multiple log sources.

In Logstash this is commonly done using conditionals to route log events to specific logstash filters.

However, with ingest pipelines , you were limited to a single pipeline that you had to specify when indexing an event. This caused you to create many different pipelines that all shared some common logic. As a result there were many downsides:

  • Duplicate logic in pipelines
  • Difficulty to maintain due to long pipelines
  • Tedious to update common logic in all pipelines

In the following example we will explore the ability to reference ingest pipelines to remove some of the complexity and duplication. For this we will use the new pipeline processor in 6.5. That allows us to call a pipeline from another pipeline:

{
  "pipeline": {
    "name": "other-pipeline"
  }
}

We will begin by describing the sample data we’re dealing with and then we will go over the pipeline definition in a simplified syntax. You can find the full pipelines in the appendix.

Imagine you have two sources of log data:

#SOURCE_1
"message": "2018-12-07T10:48:34.312Z 1.1.1.1 john 1KB"

#SOURCE_2
"message": "@timestamp=2018-12-07T10:48:34.312Z ip=1.1.1.1 user=john size=1KB"

While the data is the same, the formats are different. One requires us to use Grok or Dissect, the other can be parsed using the KV filter.

Our goals are to:

  • Parse the data
  • Convert the human readable “1KB” into bytes
  • Set the index name based on the timestamp
  • Add the current timestamp as described in yesterday's post

In pre 6.5 versions this required us to do the following:

source_1_pipeline

* Grok processor
* Bytes processor
* Date index name processor
* Script processor

source_2_pipeline

* KV processor
* Bytes processor
* Date index name processor
* Script processor

Even with these very simple log formats we are duplicating 75% of our pipelines.

As of 6.5 we can solve this the following way:

Common_pipeline

* Bytes processor
* Date index name processor
* Script processor

Source_1_pipeline

* Grok processor
* Pipeline processor (call the common pipeline)

Source_2_pipeline

* KV processor
* Pipeline processor (call the common pipeline)

The setup may be a little harder to understand as you have to view multiple pipelines to get a full understanding of what it does, but it will enable you to maintain a single common pipeline that deals with general transformations to your data.

As you will realize when reading this, it's not actually a chain, but that term is easier to understand.

We hope that this will simplify your ingestion and make it easier to handle!

Full Ingest Pipelines:


common pipeline

PUT _ingest/pipeline/common
{
  "description": "sets ingest timestamp and date based index name",
  "processors": [
    {
      "bytes": {
        "field": "size"
      }
    },
    {
      "set": {
        "field": "ingest_timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "date_index_name": {
        "field": "@timestamp",
        "index_name_prefix": "{{_index}}-",
        "date_rounding": "d"
      }
    }
  ]
}

source_1_pipeline, using the simulate API to send the sample event

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "parse source_1 and send to common pipeline",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "%{TIMESTAMP_ISO8601:@timestamp} %{IP:src.ip} %{USER:user} %{NOTSPACE:size}"
          ]
        }
      },
      {
        "pipeline": {
          "name": "common"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2018-12-07T10:48:34.312Z 1.1.1.1 john 1KB"
      }
    }
  ]
}

source_2_pipeline, using the simulate API to send the sample event

{
  "pipeline": {
    "description": "parse source_2 and send to common pipeline",
    "processors": [
      {
        "kv": {
          "field": "message",
          "field_split": " ",
          "value_split": "="
        }
      },
      {
        "pipeline": {
          "name": "common"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "@timestamp=2018-12-07T10:48:34.312Z ip=1.1.1.1 user=john size=1KB"
      }
    }
  ]
}
2 Likes