AWS SES Parsing Nested json

Hi Guys, I need your help
I get logs from S3 buckets SES, the logs change, one for a few, I get field with key value structure,
I'm trying to parse it but I'm getting errors.

here is my conf file,result and error logs.

conf:
filter {
if "emails_logs" in [tags] {

json {
source => "message"
target => "log_data"
}

split { field => "[log_data][Records]" }

json {
source => "[log_data][Records][Sns][Message]"
target => "[log_data][Records][Sns][Message]"
}

mutate { remove_field => [ "message" ] }

if [log_data][Records][Sns][Message][mail][headers]
{
kv {source => "[log_data][Records][Sns][Message][mail][headers]"
value_split => ":"
default_keys => ["name", "not_exist"]

}

}
}
}

result:

{
"_index": "emails_logs29-2019.02",
"_type": "doc",
"_id": "5J_qCWkB_kihJUD5dbuo",
"_version": 1,
"_score": null,
"_source": {
"@timestamp": "2019-02-20T07:59:14.240Z",
"tags": [
"emails_logs"
],
"@version": "1",
"log_data": {
"Records": {
"Sns": {
"SigningCertUrl": "XXX",
"TopicArn": "XXX",
"Timestamp": "2019-02-12T12:23:55.725Z",
"Type": "Notification",
"UnsubscribeUrl": "XXXX",
"Signature": "XXX",
"MessageId": "XXX",
"SignatureVersion": "1",
"Subject": null,
"Message": {
"delivery": {
"processingTimeMillis": 670,
"remoteMtaIp": "00.00.00.00",
"recipients": [
"XXX.com"
],
"timestamp": "2019-02-12T12:23:55.671Z",
"smtpResponse": "00.00.00.00 Message received",
"reportingMTA": "XXX.amazonses.com"
},
"notificationType": "Delivery",
"mail": {
"messageId": "XXX-05a1206ad947-000000",
"headersTruncated": false,
"sendingAccountId": "99999976013",
"timestamp": "2019-02-12T12:23:55.001Z",
"destination": [
"XXX.amazonses.com"
],
"commonHeaders": {
"subject": "Plain 11",
"date": "12 Feb 2019 07:23:53 -0500",
"sender": "XXX.com>",
"replyTo": [
"XXX.com"
],
"from": [
"XXX.com>"
],
"to": [
"XXX.amazonses.com"
]
},
"sourceIp": "00.00.160.100",
"headers": [
{
"value": "XXX",
"name": "Received"
},
{
"value": "76",
"name": "XXXID"
},
{
"value": "1993",
"name": "XXXontactID"
},
{
"value": "1.0",
"name": "MIME-Version"
},
{
"value": "XXX.com>",
"name": "Sender"
},
{
"value": "XXX.com>",
"name": "From"
},
{
"value": "XXX.amazonses.com",
"name": "To"
},
{
"value": "XXX.com",
"name": "Reply-To"
},
{
"value": "12 Feb 2019 07:23:53 -0500",
"name": "Date"
},
{
"value": "Plain 99",
"name": "Subject"
},
{
"value": "utf-8",
"name": "Content-Type"
},
{
"value": "base64",
"name": "Content-Transfer-Encoding"
}
],
"sourceArn": "XXX.com",
"source": "YYY.com"
}
},
"MessageAttributes": {}
},
"EventSubscriptionArn": "arn-ffba33ffe044",
"EventSource": "aws:sns",
"EventVersion": "1.0"
}
}
},
"fields": {
"log_data.Records.Sns.Message.mail.timestamp": [
"2019-02-12T12:23:55.001Z"
],
"log_data.Records.Sns.Message.delivery.timestamp": [
"2019-02-12T12:23:55.671Z"
],
"@timestamp": [
"2019-02-20T07:59:14.240Z"
],
"log_data.Records.Sns.Timestamp": [
"2019-02-12T12:23:55.725Z"
]
},
"sort": [
1549974235725,
1886
]
}

logstash error:
] Failed to create monitoring event {:message=>"For path: http_address. Map keys: [:stats, :jvm, :os]", :error=>"LogStash::Instrument::MetricStore::MetricNotFound"}

this is the way to do it.

filter {
if "emails_logs" in [tags] {

            json {
                    source => "message"
                    target => "log_data"
            }
            mutate {
                    remove_field => ["message"]
            }

            split {
                    field => "[log_data][Records]"
            }

            json {
                    source => "[log_data][Records][Sns][Message]"
                    target => "[log_data][Records][Sns][Message]"
            }

            if [log_data][Records][Sns][Message][mail][headers] {
                    ruby {
                            code => "event.get('[log_data][Records][Sns][Message][mail][headers]').each {|hash| event.set('[headers][' + hash['name'] + ']', hash['value']) };
                                     event.remove('[log_data][Records][Sns][Message][mail][headers]')"
                    }
            }
    }

}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.