Add a field to json body when indexing

I have a lambda function which sends cloudtrail logs to elasticsearch whenever new data arrives in a bucket. This works fine, however I would like to add one extra field that is not found in standard cloudtrail logs. This is what I have atm:

def handler(event, context):
logger.info('Event: ' + json.dumps(event, indent=2))
s3Bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']

try:
    response = s3.get_object(Bucket=s3Bucket, Key=key)
    content = gzip.GzipFile(fileobj=BytesIO(response['Body'].read())).read()
    for record in json.loads(content)['Records']:
        recordJson = json.dumps(record)
        indexName = 'cloudtrail-' + datetime.datetime.now().strftime("%Y.%m.%d")
        res = es.index(index=indexName, doc_type='record', id=record['eventID'], body=recordJson)
        logger.info(res)
    return True
except Exception as e:
    logger.error('Something went wrong: ' + str(e))
    traceback.print_exc()
    return False

What I want to do is to add a field

type: cloudtrail

But I'm not sure where to add this? Do I use elasticsearch API call for this or do I simply insert the field in json?

Hi,
I think the simplest way will be to add it in your lambda function or if you cannot check about ingest :

https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html

Also avoid call your field type depend on the language you use it can be reserved word and it may confuse with the _type field.

hope it will help you.

I can add it to my lambda function, my question is how? Do I inject it to json or do I add it as I send the body for indexing?

if record is a dict something like may work

    for record in json.loads(content)['Records']:
        record['my_type'] = 'cloudtrail'
        recordJson = json.dumps(record)

Sorry just notice one other thing:

You set doc_type as record

res = es.index(index=indexName, doc_type='record',

it's better to set as "_doc" it will help when you'll upgrade to 7 or later version as lot of people use this convention.

res = es.index(index=indexName, doc_type='_doc',

Hi Gabriel,

Thank you for your suggestion. I managed to insert the type field successfully.
Unfortunately when I changed doc_type to _doc I got an error and had to revert the change. We won't be able to upgrade to version 7 for a while since we're using aws's elasticsearch service which is at 6.5 atm.
Thank you again.

Not an answer but did you look at https://www.elastic.co/cloud and https://aws.amazon.com/marketplace/pp/B01N6YCISK ?

Cloud by elastic is one way to have access to all features, all managed by us. Think about what is there yet like Security, Monitoring, Reporting, SQL, Canvas, APM, Logs UI, Infra UI and what is coming next :slight_smile: ...

It includes all recent releases anytime a new version is published.

Hi David,

Yes, we considered those options but finally decided to go for AWS managed service.

Hi @davidbien,

I recommend looking at this for possible inspiration. I use that, heavily modified, to bulk index AWS "stuff" (e.g. cloudtrail) into my Elastic ES.

You should also consider doing cloudtrail->S3->S3 Event->SQS->Lambda and researching and designing for all the possible failure modes. ES will sometimes give you errors during indexing and if you don't handle retries and make the whole design with "guarantee at least once delivery", you'll loose events because of back-pressure, downtime/maintenance, mapping errors, etc. Lambda will only retry 3 times if the execution throws an exception which easily lead to data loss.

If you have enough activity in the AWS account you will find discrepancies between the number of cloudtrail events and the number of events that made it into ES.

I have also used bulk requests in my lambda to ship many events per request toward ES, your current code does 1 event per request which is slower and more wasteful, just a thing to consider depending on your volume. Your lambda could process events at a rate lower than the incoming rate, which would lead to very high concurrency for your lambda or lost events if you limited its concurrency.

I can also tell you that cloudtrail events produces collisions in the index mapping, because their schema is not consistent enough. Some event have differing types for the same field name, such events can't be indexed by ES and are lost if you don't handle them.

Hi Martin,

Thank you for the link. I will have a look at it now and see how can I improve my current code.
Currently I use this setting for sending cloudtrail logs to ES:
Cloudtrail -> S3 -> S3 trigger -> Lambda -> Redis -> logstash -> Elasticsearch.
The reason behind this is that we ahve two endpoints at the moment and we need to send them to both. With redis I am not afraid that logs will be lost.
I agree that I need to consider error handling. I occasionally get KeyError for ['Records'] but I am not sure how to handle this yet. I also noticed some events not appearing in my ES because of this.

Maybe not useful for you as you use elastic on Amazon Service and you stuck with 6.5 version [functionbeat is on beta for this version, after I don't know how the Amazon version diverge from the official version].
But just in case if somebody else come to this message, there's a more easy way to go with functionbeat.
https://www.elastic.co/guide/en/beats/functionbeat/current/functionbeat-overview.html
Or you can check about how they did and copy the functionbeat code :sweat_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.