How to get subfield of cloudtrail logs

Hi People, I am using logstash to move logs from cloudtrail to elasticsearch via logstash. Below is the conf file:

input {
 s3 {
  bucket => "test-cloudtrail"
  prefix => "AWSLogs/"
  region => "us-west-2"
  access_key_id => "AKIATRONQEXAMPLEEXA"
  secret_access_key => "B809PgsyHkqFu4KXjYrFKexAmpleExample"
  interval => 5

output {
 stdout { codec => rubydebug }
 elasticsearch {
 hosts => [""]
 user => "elastic"
 password => "blahblah" 
 index => "aws-cloudtrail"

This format is able to send logs to ES with below format:

  "_index": "aws-cloudtrail",
  "_type": "_doc",
  "_id": "ZjER92oBx8r9_uotvMwh",
  "_version": 1,
  "_score": null,
  "_source": {
    "message": "{\"Records\":[{\"eventVersion\":\"1.04\",\"userIdentity\":{\"type\":\"AssumedRole\",\"principalId\":\"AROAEXAMPLESRY:ecs-service-scheduler\",\"arn\":\"arn:aws:sts::123123123123:assumed-role/ecsServiceRole/ecs-service-scheduler\",\"accountId\":\"123123123123\",\"accessKeyId\":\
    "@version": "1",
    "@timestamp": "2019-05-27T02:14:55.631Z"
  "fields": {
    "@timestamp": [
  "sort": [

My question is very simple, what should I add in logstash.conf file so that I get an additional field which is grabbed from "message" subfield. If you see the output to ES, it has a field "message", I want to add one more field in ES output with "records[SourceIpAddress]"

I tried with "mutate" but didnt work

filter {
add_field => { "sourceIPAddress" => "%{message[Records][sourceIPAddress]}" }

If [message] is json then you could parse it using a json filter. Then if it is an array in which each element has a SourceIpAddress field you could refer to [Records][0][SourceIpAddress]

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.