Ingest pipeline not working but working in the simulate API

I have a pipeline that is filtering Palo Alto logs through Grok patterns and other processors. Testing with the simulate API shows that it works but when I try to ingest into elasticsearch none of the documents go through. I did tweaking in the elasticsearch output which did send a few of the documents to but the pipeline was still being ignored.

The dataset for these logs is quite a bit larger though than what I usually work with so I am wondering if the pipeline is just too slow to process each document.

{
 "version": 1, 
 "processors" : [
  {
  "grok" : {
    "field" : "message",
    "patterns" : ["%{TRAFFIC:traffic}","%{SYSTEM:system}"],
    "pattern_definitions": {
      
      "TRAFFIC" : "%{GREEDYDATA:ignore} - - - - 1,%{DATA:receive_time},%{NUMBER:serial},%{WORD:type},%{WORD:subtype},%{NUMBER:config_version},%{DATA:time_generated},%{IP:src},%{IP:dst},%{IP:natsrc},%{IP:natdst},%{DATA:rule},%{DATA:srcuser},%{DATA:dstuser},%{WORD:app},%{WORD:vsys},%{WORD:from},%{DATA:to},%{DATA:inbound_if},%{DATA:outbound_if},%{DATA:logset},%{DATA:FUTURE_USE},%{NUMBER:sessionid},%{NUMBER:repeatcnt},%{NUMBER:sport},%{NUMBER:dport},%{NUMBER:natsport},%{NUMBER:natdport},%{WORD:flags},%{WORD:proto},%{WORD:action},%{NUMBER:bytes},%{NUMBER:bytes_sent},%{NUMBER:bytes_received},%{NUMBER:packets},%{DATA:start},%{NUMBER:sec},%{WORD:category},%{NUMBER:tpadding},%{NUMBER:seqno},%{WORD:actionflags},%{DATA:srcloc},%{DATA:dstloc},%{NUMBER:cpadding},%{NUMBER:pkts_sent},%{NUMBER:pkts_received},%{DATA:session_end_reason},%{NUMBER:dg_hier_level_1},%{NUMBER:dg_hier_level_2},%{NUMBER:dg_hier_level_3},%{NUMBER:dg_hier_level_4},%{DATA:vsys_name},%{DATA:device_name},%{DATA:action_source},%{DATA:src_uuid},%{DATA:dst_uuid},%{NUMBER:tunnelid},%{DATA:monitortag},%{NUMBER:parent_session_id},%{DATA:parent_start_time},%{DATA:tunnel},%{NUMBER:assoc_id},%{NUMBER:chunks},%{NUMBER:chunks_sent},%{NUMBER:chunks_received},%{DATA:rule_uuid},%{NUMBER:http}",
      
      "SYSTEM" : "%{GREEDYDATA:ignore} - - - - 1,%{DATA:receive_time},%{NUMBER:serial},%{WORD:type},%{WORD:subtype},%{NUMBER:config_version},%{DATA:time_generated},%{WORD:vsys},%{DATA:eventid},%{DATA:fmt},%{DATA:id},%{WORD:module},%{WORD:severity},%{QS:opaque},%{NUMBER:seqno},%{DATA:actionflags},%{DATA:dg_hier_level_1},%{DATA:dg_hier_level_2},%{DATA:dg_hier_level_3},%{DATA:dg_hier_level_4},%{DATA:vsys_name},%{GREEDYDATA:device_name}"
      
    },
    "ignore_missing" : true,
    "ignore_failure" : true
  }
},
{
  "grok": {
    "field": "message",
    "patterns": ["%{THREAT:threat}"],
    "pattern_definitions": {
      
      "THREAT": "%{GREEDYDATA:ignore} - - - - 1,%{DATA:receive_time},%{NUMBER:serial},%{WORD:type},%{WORD:subtype},%{NUMBER:config_version},%{DATA:time_generated},%{IP:src},%{IP:dst},%{IP:natsrc},%{IP:natdst},%{DATA:rule},%{DATA:srcuser},%{DATA:dstuser},%{WORD:app},%{WORD:vsys},%{DATA:from},%{DATA:to},%{DATA:inbound_if},%{DATA:outbound_if},%{DATA:logset},%{DATA:FUTURE_USE},%{NUMBER:sessionid},%{NUMBER:repeatcnt},%{NUMBER:sport},%{NUMBER:dport},%{NUMBER:natsport},%{NUMBER:natdport},%{WORD:flags},%{DATA:proto},%{WORD:action},%{DATA:misc},%{DATA:threatid},%{DATA:category},%{WORD:severity},%{DATA:direction},%{NUMBER:seqno},%{DATA:actionflags},%{DATA:srcloc},%{DATA:dstloc},%{NUMBER:cpadding},%{DATA:contenttype},%{DATA:pcap_id},%{DATA:filedigest},%{DATA:cloud},%{DATA:url_idx},%{DATA:user_agent},%{DATA:filetype},%{DATA:xff},%{DATA:referer},%{DATA:sender},%{DATA:subject},%{DATA:recipient},%{DATA:reportid},%{DATA:dg_hier_level_1},%{DATA:dg_hier_level_2},%{DATA:dg_hier_level_3},%{DATA:dg_hier_level_4},%{DATA:vsys_name},%{DATA:device_name},%{DATA:file_url},%{DATA:src_uuid},%{DATA:dst_uuid},%{DATA:http_method},%{NUMBER:tunnel_id},%{DATA:monitortag},%{DATA:parent_session_id},%{DATA:parent_start_time},%{WORD:tunnel},%{DATA:thr_category},%{DATA:six_flags},%{DATA:assoc_id},%{NUMBER:ppid},%{DATA:http_headers},%{QS:url_category},%{DATA:rule_uuid},%{NUMBER:http}"
      
    }, 
    "ignore_missing": true,
    "ignore_failure": true
  }
},
{
  "geoip" : {
    "field" : "dst",
    "ignore_failure": true
  }
},
{
  "geoip" : {
    "field" : "src",
    "ignore_failure" : true
  }
},
{
  "geoip" : {
    "field" : "natsrc",
    "ignore_failure": true
  }
},
{
  "geoip" : {
    "field" : "natdst",
    "ignore_failure": true
  }
},
{
  "convert": {
    "field": "bytes",
    "type": "float",
    "ignore_failure": true,
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "bytes_received",
    "type": "float",
    "ignore_failure": true,
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "bytes_sent",
    "type": "float",
    "ignore_failure": true,
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "packets",
    "type": "float",
    "ignore_failure": true,
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "pkts_received",
    "type": "float",
    "ignore_failure": true,
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "pkts_sent",
    "type": "float",
    "ignore_failure": true,
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "chunks",
    "type": "float",
    "ignore_failure": true,
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "chunks_received",
    "type": "float",
    "ignore_failure": true,
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "chunks_sent",
    "type": "float",
    "ignore_failure": true,
    "ignore_missing": true
  }
},
{
  "date" : {
    "field" : "receive_time",
    "formats" : ["YYYY/MM/DD HH:mm:ss"],
    "ignore_failure": true
  }
},
{
  "date" : {
    "field" : "start",
    "formats" : ["YYYY/MM/DD HH:mm:ss"],
    "ignore_failure": true
  }
},
{
  "date" : {
    "field" : "time_generated",
    "formats" : ["YYYY/MM/DD HH:mm:ss"],
    "ignore_failure": true
  }
},
{
  "remove": {
    "field": "ignore",
    "ignore_failure": true
  }
},
{
  "remove": {
    "field": "threat",
    "ignore_failure": true
  }
},
{
  "remove": {
    "field": "system",
    "ignore_failure": true
  }
},
{
  "remove": {
    "field": "traffic",
    "ignore_failure": true
  }
}
]
}

SImulate is working when testing the messages.

Hi @themarcusaurelius welcome to the forum.

A couple thoughts / simple questions, ingest pipelines can be a bit hard to debug

Have you validated the pipeline exists

GET _ingest/pipeline/my_pipeline_id

Have you actually just manually ingested a few docs through the pipeline and see that they end up in the index correctly, not simulate.

POST my-index/_doc?pipeline=my_pipeline_id
{
  "field" : "value",
   ....
}

How are you ingesting the logs via logstash or filebeat?

How are you defining which pipeline to use?

1 Like

Do you have any logs in Logstash telling you that documents do not go through?

What I usually do when it comes to pipelines, is to add a failure path at the end so that the documents are saved in another index with an error message, so you can review them:

Add the following on_failure section at the end after the processors section:

  "on_failure": [
    {
      "append": {
        "field": "meta.errors",
        "value": "{{ _ingest.on_failure_message }}, {{ _ingest.on_failure_processor_type }}, {{ _ingest.on_failure_processor_tag }}"
      }
    },
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }
  ]

Then, run your pipeline and check if you have anything landing in failed-my-index.

2 Likes

Stephen,

Thanks for the reply. Pipeline has been validated and does exist.

I will try to ingest some manually and see if it works!

I am directly ingesting through filebeat by adding the pipeline name to the output.elasticsearch.

output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["removed"]
  pipeline: "TG"
  index: "tgen-%{+yyyy.MM.dd}"
setup.template.name: "tgen"
setup.template.pattern: "tgen*"

Off the top of my head, I would think that the pipeline is failing because the requests being made are taking too long and filebeat is tripping up?

Thanks Val. I will add this and try it out

@themarcusaurelius
Did you get any solution for same, I am also facing similar issue.

I had this same issue. In my case I was installing a Filebeat module (nginx), and and was trying to run a custom pipeline against the nginx data.

The problem was that the nginx module installs several pipelines, and those pipelines were getting precedence. So, my pipeline was never run.

Here are some suggestions for debugging where your issue is happening:

  1. Run GET /_nodes/stats
    (https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html)
    Search the results for your pipeline name and check the count. In my case, the count was 0 for my pipeline, which told me it wasn't getting run at all.
  2. If you have multiple pipelines, try calling a pipeline from within another pipeline that you know is working: https://www.elastic.co/guide/en/elasticsearch/reference/current/pipeline-processor.html
  3. You can also use final_pipeline to call a final pipeline after everything else has run: https://www.elastic.co/guide/en/elasticsearch/reference/master/index-modules.html#dynamic-index-settings

In my filebeat config file, I was calling my pipeline via the output tag:
(https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html#pipeline-option-es)

output.elasticsearch.pipelines:
  - pipeline: my_custom_pipeline
    when.contains:
      event.module: "nginx"

However, this wasn't ever calling my pipeline. So, I moved the pipeline up to the nginx module entry itself:

config:
  - module: nginx
    access:
      input:
        pipeline: my_custom_pipeline

After that, everything started working as expected. So, it seems that my issue was that Elasticsearch was running the default installed pipelines from the nginx module, but never running my custom pipeline until I moved the config for my custom pipeline into the nginx module config.

1 Like