Parse aws vpcflow or cloudtrail logs is not working. When testing the pipeline auto generate by filebeat it returns an error: field [event.original] already exists.
Processors:
[
{
"drop": {
"if": "ctx.message.startsWith(\"version\") || ctx.message.startsWith(\"instance-id\")"
}
},
{
"set": {
"field": "event.ingested",
"value": "{{_ingest.timestamp}}"
}
},
{
"set": {
"value": "8.0.0",
"field": "ecs.version"
}
},
{
"rename": {
"field": "message",
"target_field": "event.original",
"ignore_missing": true
}
},
{
"set": {
"field": "event.kind",
"value": "event"
}
},
{
"set": {
"field": "event.category",
"value": [
"network"
]
}
},
{
"set": {
"value": "aws",
"field": "cloud.provider"
}
},
{
"set": {
"copy_from": "cloud.account.id",
"field": "aws.vpcflow.account_id",
"ignore_empty_value": true
}
},
{
"set": {
"ignore_empty_value": true,
"copy_from": "cloud.instance.id",
"field": "aws.vpcflow.instance_id"
}
},
{
"uppercase": {
"ignore_missing": true,
"field": "event.action",
"target_field": "aws.vpcflow.action"
}
},
{
"geoip": {
"field": "source.ip",
"target_field": "source.geo",
"ignore_missing": true
}
},
{
"geoip": {
"ignore_missing": true,
"field": "destination.ip",
"target_field": "destination.geo"
}
},
{
"geoip": {
"properties": [
"asn",
"organization_name"
],
"ignore_missing": true,
"database_file": "GeoLite2-ASN.mmdb",
"field": "source.ip",
"target_field": "source.as"
}
},
{
"geoip": {
"properties": [
"asn",
"organization_name"
],
"ignore_missing": true,
"database_file": "GeoLite2-ASN.mmdb",
"field": "destination.ip",
"target_field": "destination.as"
}
},
{
"rename": {
"target_field": "source.as.number",
"ignore_missing": true,
"field": "source.as.asn"
}
},
{
"rename": {
"field": "source.as.organization_name",
"target_field": "source.as.organization.name",
"ignore_missing": true
}
},
{
"rename": {
"ignore_missing": true,
"field": "destination.as.asn",
"target_field": "destination.as.number"
}
},
{
"rename": {
"field": "destination.as.organization_name",
"target_field": "destination.as.organization.name",
"ignore_missing": true
}
},
{
"remove": {
"field": "event.original",
"if": "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))",
"ignore_failure": true,
"ignore_missing": true
}
}
]
Failure processors
[
{
"set": {
"value": "{{{ _ingest.on_failure_message }}}",
"field": "error.message"
}
}
]
messagem sample
0 000000000000 eni-000ebc0f000dfe000 000.00.000.000 000.00.00.00 00000 00000 0 0 0000 0000000000 0000000000 ACCEPT OK
P.S.: if we handle manually the parse in logstash using grok it works. But I would like that the ingest pipeline auto generated by filebeat setup --pipeline works to avoid breakings in default dashboards when update the elk versions.