Elastic Ingest Pipeline Works with Test, but Fails on real usage

Hi All,

Elasticsearch: 7.16.2
Logstash: 7.16.2

I have an ingest pipeline which processes syslog messages:

Example message:

<86>Feb 1 18:38:07 ASDF1234 sudo: pam_unix(sudo:session): session closed for user root

The pipeline processes this message into the following when using the ingest node pipeline test docs:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_type": "_doc",
        "_id": "_id",
        "_source": {
          "process": {
            "name": [
              "pam_unix",
              "sudo"
            ]
          },
          "@timestamp": "2022-02-01T18:38:07.000Z",
          "host": {
            "name": "ASDF1234",
            "hostname": "ASDF1234"
          },
          "message": "pam_unix(sudo:session): session closed for user root",
          "event": {
            "ingested": "2022-02-01T18:44:17.609479574Z",
            "original": "<86>Feb 1 18:38:07 ASDF1234 sudo: pam_unix(sudo:session): session closed for user root",
            "kind": "event",
            "module": "app",
            "action": "session",
            "category": "process",
            "type": "info",
            "dataset": "app.info",
            "outcome": "unknown"
          },
          "user": {
            "name": "root",
            "id": "root"
          }
        },
        "_ingest": {
          "timestamp": "2022-02-01T18:44:17.609479574Z"
        }
      }
    }
  ]
}

However, when Logstash forwards the Syslog message to the Elasticsearch ingest pipeline I get the following error:

[2022-02-01T18:38:08,563][WARN ][logstash.outputs.elasticsearch][syslog][elasticsearch_pipeline_output] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-app.info-production", :routing=>nil, :pipeline=>"app-ingress"}, {"event"=>{"module"=>"app", "original"=>"<86>Feb 1 18:38:08 ASDF1234 sudo: pam_unix(sudo:session): session opened for user root by (uid=0)"}, "message"=>"<86>Feb 1 18:38:08 ASDF1234 sudo: pam_unix(sudo:session): session opened for user root by (uid=0)", "@version"=>"1", "data_stream"=>{"type"=>"logs", "namespace"=>"production", "dataset"=>"app.info"}, "@timestamp"=>2022-02-01T18:38:08.397Z, "tags"=>["app"]}], :response=>{"create"=>{"_index"=>".ds-logs-app.info-production-2022.02.01-000008", "_type"=>"_doc", "_id"=>"TQ2Utn4B6WtKNMd0cACu", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [user] tried to parse field [user] as object, but found a concrete value"}}}}

The Logstash pipeline is straight forward:

input {
    tcp {
        port => 10006
        tags => ["app"]
        mode => "server"
        add_field => {"[event][module]" => "app"}
        add_field => {"[data_stream][type]" => "logs"}
        add_field => {"[data_stream][dataset]" => "app.info"}
        add_field => {"[data_stream][namespace]" => "production"}
        add_field => {"[@metadata][pipeline]" => "app-ingress"}
        id => "app_tcp_syslog_input"
    }
}
output {
    if [@metadata][pipeline] {
        elasticsearch {
            hosts => ["${ELASTICSEARCH_OUTPUT_HOSTS}"]
            action => "create"
            ssl_certificate_verification => false
            index => ["%{[data_stream][type]}-%{[data_stream][dataset]}-%{[data_stream][namespace]}"]
            ssl => true
            cacert => "${ELASTICSEARCH_CA}"
            api_key => "${LOGSTASH_ADMIN_API_KEY}"
            pipeline => "%{[@metadata][pipeline]}"
            id => "elasticsearch_pipeline_output"
        }
    }
}

Given that user in the test output is an object and not a concrete value, does anyone have any ideas on what is going on here? Or how to better debug the issue?

Hi,

The structure of the 'real usage' output is completely the same as test?
You can check it by using stdout output plugin for example.

output {
  stdout { codec => rubydebug}
}

I'm not sure because I do not use Logstash in combination with Elasticsearch ingest pipelines, but I think that if your ingest pipeline fails and cannot index the document, it will return an error to logstash.

This could explain why you are receiving a mapping error for a field that does not exist when the document exits the logstash pipeline.

Can you share your ingest pipeline?

Check the mapping of your index you will most likely find that for the field user It is defined as either a text or keyword, not an object with subjl fields. That's what the error message is telling you.

That message tells you what you're trying to write and the mapping are a mismatch.

Pipelines are run before the actual write.. The error is when the document is actually being written and it does not match the index mapping.

1 Like

@Tomo_M Logstash sends the following doc to Elasticsearch (gotten from rubydebug output):

{
"tags" => [
[0] "app"
],
"data_stream" => {
"dataset" => "app.info",
"type" => "logs",
"namespace" => "production"
},
"@version" => "1",
"message" => "<86>Feb 2 14:08:56 ASDF1234 sudo: pam_unix(sudo:session): session opened for user root by (uid=0)",
"event" => {
"module" => "app",
"original" => "<86>Feb 2 14:08:56 ASDF1234 sudo: pam_unix(sudo:session): session opened for user root by (uid=0)"
},
"@timestamp" => 2022-02-02T14:08:56.627Z
}

Converting that into JSON:

[
  {
    "_source": {
      "tags": [
        "app"
      ],
      "data_stream": {
        "dataset": "app.info",
        "type": "logs",
        "namespace": "production"
      },
      "@version": "1",
      "message": "<86>Feb 2 14:08:56 ASDF1234 sudo: pam_unix(sudo:session): session opened for user root by (uid=0)",
      "event": {
        "module": "app",
        "original": "<86>Feb 2 14:08:56 ASDF1234 sudo: pam_unix(sudo:session): session opened for user root by (uid=0)"
      },
      "@timestamp": "2022-02-02T14:08:56.627Z"
    }
  }
]

Then running through the test:


  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_type": "_doc",
        "_id": "_id",
        "_source": {
          "process": {
            "name": [
              "pam_unix",
              "sudo"
            ]
          },
          "@timestamp": "2022-02-02T14:08:56.000Z",
          "data_stream": {
            "namespace": "production",
            "type": "logs",
            "dataset": "app.info"
          },
          "@version": "1",
          "host": {
            "name": "ASDF1234",
            "hostname": "ASDF1234"
          },
          "message": "pam_unix(sudo:session): session opened for user root by (uid=0)",
          "event": {
            "ingested": "2022-02-02T14:21:33.778782083Z",
            "original": "<86>Feb 2 14:08:56 ASDF1234 sudo: pam_unix(sudo:session): session opened for user root by (uid=0)",
            "kind": "event",
            "module": "app",
            "action": "session",
            "category": "process",
            "type": "info",
            "dataset": "app.info",
            "outcome": "unknown"
          },
          "user": {
            "name": "opened",
            "id": [
              "opened",
              "0"
            ]
          },
          "tags": [
            "app"
          ]
        },
        "_ingest": {
          "timestamp": "2022-02-02T14:21:33.778782083Z"
        }
      }
    }
  ]
}

Results in the correct processing.

@stephenb The backing index is a data stream, running the following command:

GET /logs-app.info-production/_mapping/field/user

Results in no concrete mappings for user:

{
  "restored-.ds-logs-app.info-production-2021.10.12-000003" : {
    "mappings" : { }
  },
  "restored-.ds-logs-app.info-production-2021.11.11-000004" : {
    "mappings" : { }
  },
  ".ds-logs-app.info-production-2022.01.10-000006" : {
    "mappings" : { }
  },
  "restored-.ds-logs-app.info-production-2021.09.11-000002" : {
    "mappings" : { }
  },
  ".ds-logs-app.info-production-2021.12.11-000005" : {
    "mappings" : { }
  },
  ".ds-logs-app.info-production-2022.02.01-000008" : {
    "mappings" : { }
  },
  "restored-.ds-logs-app.info-production-2021.08.12-000001" : {
    "mappings" : { }
  }
}

Running:

GET /logs-app.info-production/_mapping/field/user.*

This outputs the correct ECS mappings for user.*

@leandrojmp here is the Ingest Pipeline:

{
  "version": 1,
  "processors": [
    {
      "set": {
        "field": "event.ingested",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "set": {
        "field": "event.original",
        "value": "{{_source.message}}",
        "ignore_empty_value": true
      }
    },
    {
      "set": {
        "field": "event.category",
        "value": "process"
      }
    },
    {
      "set": {
        "field": "event.dataset",
        "value": "app.info"
      }
    },
    {
      "set": {
        "field": "event.kind",
        "value": "event"
      }
    },
    {
      "set": {
        "field": "event.module",
        "value": "app"
      }
    },
    {
      "set": {
        "field": "event.type",
        "value": "info"
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{SYSLOGBASE} %{DATA:message}$",
          "%{SYSLOGPROG} <%{NONNEGINT:facility}>%{INT} %{TIMESTAMP_ISO8601:timestamp} %{SYSLOGHOST:logsource} %{DATA:message}$"
        ],
        "ignore_missing": true
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{DATA:program} %{POSINT:pid} %{WORD:app} \\[%{DATA:kvp_1}\\]\\[%{DATA:kvp_2}\\] %{DATA:sub_message}$",
          "%{USERNAME:program}\\(%{USERNAME:subprocess}:%{WORD:action}\\): %{DATA} %{USERNAME:user}$",
          "%{USERNAME:program}\\(%{USERNAME:subprocess}:%{WORD:action}\\): %{DATA} %{USERNAME:user} %{DATA} \\(uid=%{INT:user_id}\\)$",
          "%{DATA} %{IP:source.ip}$",
          "%{USERNAME:username} : %{DATA:kvp}$"
        ],
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "pid",
        "target_field": "process.pid",
        "ignore_missing": true
      }
    },
    {
      "convert": {
        "field": "process.pid",
        "type": "long",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "program",
        "target_field": "process.name",
        "ignore_missing": true
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": [
          "MMM d HH:mm:ss",
          "ISO8601"
        ],
        "target_field": "@timestamp"
      }
    },
    {
      "remove": {
        "field": "timestamp",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "logsource",
        "target_field": "host.name",
        "ignore_missing": true
      }
    },
    {
      "remove": {
        "field": [
          "host.address",
          "port"
        ],
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "host.hostname",
        "value": "{{_source.host.name}}"
      }
    },
    {
      "kv": {
        "field": "kvp",
        "field_split": "\\s;\\s",
        "value_split": "=",
        "target_field": "app.kvp",
        "ignore_missing": true,
        "strip_brackets": true
      }
    },
    {
      "remove": {
        "field": "kvp",
        "ignore_missing": true
      }
    },
    {
      "kv": {
        "field": "kvp_1",
        "field_split": "\\s",
        "value_split": "(=|@)",
        "target_field": "app.kvp",
        "ignore_missing": true,
        "strip_brackets": true
      }
    },
    {
      "remove": {
        "field": "kvp_1",
        "ignore_missing": true
      }
    },
    {
      "kv": {
        "field": "kvp_2",
        "field_split": "\\s",
        "value_split": "(=|@)",
        "target_field": "app.kvp",
        "ignore_missing": true,
        "strip_brackets": true
      }
    },
    {
      "remove": {
        "field": "kvp_2",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "action",
        "target_field": "event.action",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "user",
        "target_field": "user.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.USER",
        "target_field": "user.id",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "user.name",
        "value": "{{_source.user.id}}",
        "ignore_empty_value": true
      }
    },
    {
      "append": {
        "field": "process.name",
        "value": [
          "{{_source.subprocess}}"
        ],
        "if": "ctx?.subprocess != null"
      }
    },
    {
      "remove": {
        "field": "subprocess",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.PWD",
        "target_field": "process.working_directory",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.COMMAND",
        "target_field": "process.command_line",
        "ignore_missing": true
      }
    },
    {
      "append": {
        "field": "user.name",
        "value": [
          "{{_source.username}}"
        ],
        "if": "ctx?.username != null"
      }
    },
    {
      "remove": {
        "field": "username",
        "ignore_missing": true
      }
    },
    {
      "append": {
        "field": "user.id",
        "value": [
          "{{_source.user_id}}"
        ],
        "if": "ctx?.user_id != null"
      }
    },
    {
      "remove": {
        "field": "user_id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.eventId",
        "target_field": "event.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.eventSeverity",
        "target_field": "log.level",
        "ignore_missing": true
      }
    },
    {
      "lowercase": {
        "field": "log.level",
        "target_field": "log.level",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.locationName",
        "target_field": "host.geo.name",
        "ignore_missing": true
      }
    },
    {
      "remove": {
        "field": "host.geo",
        "ignore_missing": true,
        "if": "ctx?.host?.geo?.name != null && ctx.host.geo.name == '-'"
      }
    },
    {
      "rename": {
        "field": "app.kvp.eventType",
        "target_field": "event.action",
        "ignore_missing": true
      }
    },
    {
      "lowercase": {
        "field": "event.action",
        "target_field": "event.action",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.nodeIpAddress",
        "target_field": "host.ip",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "event.status",
        "value": "success",
        "ignore_empty_value": true,
        "if": "ctx?.app?.kvp?.status != null && ctx.app.kvp.status == 'success'"
      }
    },
    {
      "rename": {
        "field": "facility",
        "target_field": "log.syslog.facility.code",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.profile",
        "target_field": "app.profile",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.ndc",
        "target_field": "app.ndc",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.mdc",
        "target_field": "app.mdc",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.parentSpanId",
        "target_field": "app.span.parent.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.spanId",
        "target_field": "app.span.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.objectType",
        "target_field": "app.object.type",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.objectName",
        "target_field": "app.object.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.objectId",
        "target_field": "app.object.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.jobId",
        "target_field": "app.job.id",
        "ignore_missing": true
      }
    },
    {
      "remove": {
        "field": "app.job.id",
        "ignore_missing": true,
        "if": "ctx?.app?.job?.id != null && ctx.app.job.id == \"\""
      }
    },
    {
      "rename": {
        "field": "app.kvp.jobType",
        "target_field": "app.job.type",
        "ignore_missing": true
      }
    },
    {
      "remove": {
        "if": "ctx?.app?.job?.type != null && ctx.app.job.type == \"\"",
        "field": "app.job.type",
        "ignore_missing": true
      }
    },
    {
      "remove": {
        "field": "app.job",
        "ignore_missing": true,
        "if": "ctx?.app?.job != null && ctx.app.job.isEmpty()"
      }
    },
    {
      "rename": {
        "field": "app.kvp.instanceId",
        "target_field": "app.instance.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.clusterName",
        "target_field": "app.cluster.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.eventName",
        "target_field": "app.event.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.eventDetail",
        "target_field": "app.event.detail",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.eventSeriesId",
        "target_field": "app.event.series.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.taskId",
        "target_field": "app.task.id",
        "ignore_missing": true
      }
    },
    {
      "remove": {
        "field": "app.task",
        "ignore_missing": true,
        "if": "ctx?.app?.task?.id != null && ctx.app.task.id == \"\""
      }
    },
    {
      "rename": {
        "field": "app.kvp.status",
        "target_field": "app.status",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.tracerId",
        "target_field": "app.tracer.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp.nodeId",
        "target_field": "app.node.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "sub_message",
        "target_field": "app.message",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "app.kvp",
        "target_field": "app.unknown",
        "ignore_missing": true
      }
    },
    {
      "remove": {
        "field": "app.unknown",
        "ignore_missing": true,
        "if": "ctx?.app?.unknown != null && ctx.app.unknown.isEmpty()"
      }
    },
    {
      "rename": {
        "field": "app",
        "target_field": "process.title",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "event.outcome",
        "value": "unknown",
        "if": "ctx?.event?.outcome == null"
      }
    },
    {
      "set": {
        "field": "event.action",
        "value": "unknown",
        "ignore_empty_value": true,
        "if": "ctx?.event?.action == null"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "error.message",
        "value": "{{ _ingest.on_failure_message }}"
      }
    }
  ]
}

Hi @BenB196 I can only tell you that there is a miss-match between what you are trying to write and the allowed mapping that is what the error is saying.

You can test it by using that processed doc above and going to Kibana-> Dev Tools and manually POST it to the index to should get the same error.

You can also test it by not just simulating the pipeline but just actually posting the doc to the index and naming the pipeline.

POST my-index/_doc?pipeline=my pipeline
{
source doc from logstash
}

Also .. I have seen this as an occasional error when there are many correct logs and just a few that do not parse correctly...

1 Like

@stephenb I ran:

POST logs-app.info-production/_doc?pipeline=app-ingress
{
  "tags": [
    "app"
  ],
  "data_stream": {
    "dataset": "app.info",
    "type": "logs",
    "namespace": "production"
  },
  "@version": "1",
  "message": "<86>Feb 2 14:08:56 ASDF1234 sudo: pam_unix(sudo:session): session opened for user root by (uid=0)",
  "event": {
    "module": "app",
    "original": "<86>Feb 2 14:08:56 ASDF1234 sudo: pam_unix(sudo:session): session opened for user root by (uid=0)"
  },
  "@timestamp": "2022-02-02T14:08:56.627Z"
}

And the document indexed.

{
  "_index" : ".ds-logs-app.info-production-2022.02.01-000008",
  "_type" : "_doc",
  "_id" : "8SH1un4BcHhxoniDBrDU",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 898551,
  "_primary_term" : 1
}

Ok confused I don't see the error

"result" : "created",

Looks like it worked

Yeah, sorry, I accidently put the wrong stuff.

The document index correctly when going through the manual POST command, but fails when going through Logstash.

The exact same document?... if so you have some debugging to do ... it is there somewhere...

OR

Like I said I have seen this where a log file has 100s or 1000s of lines and only few lines are not parsed correctly.... that is pretty common... and can be hard to find...

Ohhh thought can you try manually naming your pipeline in the logstash output

        pipeline => "app-ingress" 

I have an idea...

I assume you are using filebeat... perhaps the correct pipeline name is not being passed... if you overode the pipeline name in the filebeat config if you did not do it correct it may not be being set correctly

1 Like

There is something that I think could be the issue.

Your grok creates the field user and later your ingest pipeline will rename this field to user.something, if your ingest pipeline fails before reaching this point, your document will be left with the user field, which will give you a mapping error, but it will also fails to index not allowing you to see what was the error in the ingest pipeline.

Normally all my indices have this setting "index.mapping.ignore_malformed": true to reject only the field with the mapping error, not the entire document, if the entire document is being rejected by this mapping error, you could apply this setting to reject only the conflicting field and see if there is some hint in the error.message field.

Also, can you test removing the elasticsearch output and output only to a file or stdout to see what logstash is sending?

The tcp input sends a host and port field, this host field could make some of your processors fails if this is being sent.

You could try to put this in your logstash pipeline to check if this is the issue:

filter {
    mutate {
        remove_field => ["host", "port"]
    }
}
1 Like

Thanks @leandrojmp this helped. Unfortunately, you can't set "index.mapping.ignore_malformed": true at the global index level for data streams as you get the error:

"causes": [
      "invalid composite mappings for [simulate_template_myv_pkkwtksbcjfhnfopea]",
      "data stream timestamp field [@timestamp] has disallowed [ignore_malformed] attribute specified"
    ]

So, I did the next best thing, I changed the grok pattern to not parse directly to user.

This allowed the document to at least be ingested, which allowed for this error to appear:

"error.message" :  "Text 'Feb  2 15:45:46' could not be parsed at index 4"

Which appears to suggest that the date being passed from Logstash has some weird syntax that works when directly pasting the text, but fails when coming from Logstash.

Yeah, this makes sense, your date processor in the ingest pipeline is after your grok filter, so when your document reach this processor, your document has the user field in it, since the date filter is failing and you are not using the ingore_failure option as true, the entire ingest pipeline is halted and the other processors are skipped, which includes the processor that would rename the user field into user.something.

The failure in your date filter is actually pretty common and easy to solve.

You are expecting this: "MMM d HH:mm:ss" which should be Feb 2 15:45:46, but your data is really Feb 2 15:45:46, you have an extra space between MMM and d.

Try to change your date filter into this:

    {
      "date": {
        "field": "timestamp",
        "formats": [
          "MMM d HH:mm:ss",
          "MMM  d HH:mm:ss",
          "ISO8601"
        ],
        "target_field": "@timestamp"
      }
    }
3 Likes

Ahhh, thanks @leandrojmp that makes sense. I gave that a try and that seems to have resolved my issue.

Kind of weird/annoying that Logstash appears to log on its end a single space, but Elasticsearch resolves/gets a double-spaced date, makes it very hard to debug if the document never actually makes it into the index.

Going forward, will definitely try to avoid writing to root ECS objects to avoid this kind of issue though.

Thanks, all for the help on this.

@leandrojmp Brilliant! I did not know about that setting... Today I Learned!

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.