Aws ingest pipeline error in processor rename "message" to "event.original"

Parse aws vpcflow or cloudtrail logs is not working. When testing the pipeline auto generate by filebeat it returns an error: field [event.original] already exists.

Processors:

[
  {
    "drop": {
      "if": "ctx.message.startsWith(\"version\") || ctx.message.startsWith(\"instance-id\")"
    }
  },
  {
    "set": {
      "field": "event.ingested",
      "value": "{{_ingest.timestamp}}"
    }
  },
  {
    "set": {
      "value": "8.0.0",
      "field": "ecs.version"
    }
  },
  {
    "rename": {
      "field": "message",
      "target_field": "event.original",
      "ignore_missing": true
    }
  },
  {
    "set": {
      "field": "event.kind",
      "value": "event"
    }
  },
  {
    "set": {
      "field": "event.category",
      "value": [
        "network"
      ]
    }
  },
  {
    "set": {
      "value": "aws",
      "field": "cloud.provider"
    }
  },
  {
    "set": {
      "copy_from": "cloud.account.id",
      "field": "aws.vpcflow.account_id",
      "ignore_empty_value": true
    }
  },
  {
    "set": {
      "ignore_empty_value": true,
      "copy_from": "cloud.instance.id",
      "field": "aws.vpcflow.instance_id"
    }
  },
  {
    "uppercase": {
      "ignore_missing": true,
      "field": "event.action",
      "target_field": "aws.vpcflow.action"
    }
  },
  {
    "geoip": {
      "field": "source.ip",
      "target_field": "source.geo",
      "ignore_missing": true
    }
  },
  {
    "geoip": {
      "ignore_missing": true,
      "field": "destination.ip",
      "target_field": "destination.geo"
    }
  },
  {
    "geoip": {
      "properties": [
        "asn",
        "organization_name"
      ],
      "ignore_missing": true,
      "database_file": "GeoLite2-ASN.mmdb",
      "field": "source.ip",
      "target_field": "source.as"
    }
  },
  {
    "geoip": {
      "properties": [
        "asn",
        "organization_name"
      ],
      "ignore_missing": true,
      "database_file": "GeoLite2-ASN.mmdb",
      "field": "destination.ip",
      "target_field": "destination.as"
    }
  },
  {
    "rename": {
      "target_field": "source.as.number",
      "ignore_missing": true,
      "field": "source.as.asn"
    }
  },
  {
    "rename": {
      "field": "source.as.organization_name",
      "target_field": "source.as.organization.name",
      "ignore_missing": true
    }
  },
  {
    "rename": {
      "ignore_missing": true,
      "field": "destination.as.asn",
      "target_field": "destination.as.number"
    }
  },
  {
    "rename": {
      "field": "destination.as.organization_name",
      "target_field": "destination.as.organization.name",
      "ignore_missing": true
    }
  },
  {
    "remove": {
      "field": "event.original",
      "if": "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))",
      "ignore_failure": true,
      "ignore_missing": true
    }
  }
]

Failure processors

[
  {
    "set": {
      "value": "{{{ _ingest.on_failure_message }}}",
      "field": "error.message"
    }
  }
]

messagem sample

0 000000000000 eni-000ebc0f000dfe000 000.00.000.000 000.00.00.00 00000 00000 0 0 0000 0000000000 0000000000 ACCEPT OK

P.S.: if we handle manually the parse in logstash using grok it works. But I would like that the ingest pipeline auto generated by filebeat setup --pipeline works to avoid breakings in default dashboards when update the elk versions.

Are you sending the log directly from filebeat to Elasticserch or are you sending from Filebeat to Logstash and then to Elasticsearch?

Hi! Thank you for the quick response. I am using the following flow: Filebeat to Logstash and then to Elasticsearch.

Yeah, Filebeat modules expects that you send the data directly to Elasticsearch, when you use Logstash it may change the original message which can give you some issues.

If you are using Logstash 8+ it will per default add ecs fields into the message it receives, so the event.original is being added by Logstash.

You may remove this field in your Logstash pipeline or disable the ecs_compatibility for that pipeline using pipeline.ecs_compatibility: disabled in the pipelines.yml file.

2 Likes

@diogoeverson Welcome to the community!

Also when you setup the pipelines with filebeat you really want to set up all the assets not just the pipelines.like dashboard etc

So just run the following..

filebeat setup -e

That will load everything/ all the assets

1 Like

@stephenb , thank you for the quick response. Unfortunately your tip did not work. Could be about we are using the flow Filebeat to Logstash and then to Elasticsearch? As @leandrojmp maybe we should user the flow filebeat to elasticsearch to this work automatically?

@leandrojmp, thank you for the help.

In this case, specifically the field been added is not a issue to me. What is a issue is the parse of the message is not happen. Do you think that is because this error? Because even removing the processor directly in pipeline in kibana and handle the impact of this error on the other processors of the ingest pipeline, the parse did not happen. I will try to remove in logstash beats.conf.

But it is an issue for the ingest pipeline.

Filebeat modules uses ingest pipelines in Elasticsearch, so it is expected that the original message collected will be sent directly to Elasticsearch.

When you add Logstash between filebeat and elasticsearch the original message can change and this can break the ingest pipeline in multiple points.

For example, in the cloudtrail ingest pipeline, you have this processor in the beginning:

  - rename:
      field: "message"
      target_field: "event.original"

If the message arriving to elasticsearch already has a field named event.original, the pipeline will fail here and further processors will not be executed.

Logstash 8+ per default will add an event.original field, so it will probably break a lot of ingest pipelines, so you need to remove it as mentioned on the previous answer.

One thing is, why are you using Logstash? You can't change the original message, so Logstash will just act as a proxy to Elasticsearch in this case, it would be better to just send it directly to Elasticsearch.

Also, if you are just starting to collect logs with Elastic Stack I would recommend that you look into using the Elastic Agent and Fleet.

Filebeat modules are not being kept up to date and will probably be deprecated in the future, for example the cloudtrail ingest pipeline in the elastic agent will not fail if the event.original field already exists.

BTW when using modules I personally suggest getting

Filebeat > Elasticsearch

Working first before

Filebeat > Logstash > Elasticsearch

And your logstash pipeline needs to be if the form here

As @leandrojmp stated the evemt.original is an issue that needs to be corrected / accounted for.

I also agree unless there is a particular reason for using logstash you be are just added complexity.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.