I have a lambda function which sends cloudtrail logs to elasticsearch whenever new data arrives in a bucket. This works fine, however I would like to add one extra field that is not found in standard cloudtrail logs. This is what I have atm:
def handler(event, context):
logger.info('Event: ' + json.dumps(event, indent=2))
s3Bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
try:
response = s3.get_object(Bucket=s3Bucket, Key=key)
content = gzip.GzipFile(fileobj=BytesIO(response['Body'].read())).read()
for record in json.loads(content)['Records']:
recordJson = json.dumps(record)
indexName = 'cloudtrail-' + datetime.datetime.now().strftime("%Y.%m.%d")
res = es.index(index=indexName, doc_type='record', id=record['eventID'], body=recordJson)
logger.info(res)
return True
except Exception as e:
logger.error('Something went wrong: ' + str(e))
traceback.print_exc()
return False
What I want to do is to add a field
type: cloudtrail
But I'm not sure where to add this? Do I use elasticsearch API call for this or do I simply insert the field in json?
Thank you for your suggestion. I managed to insert the type field successfully.
Unfortunately when I changed doc_type to _doc I got an error and had to revert the change. We won't be able to upgrade to version 7 for a while since we're using aws's elasticsearch service which is at 6.5 atm.
Thank you again.
Cloud by elastic is one way to have access to all features, all managed by us. Think about what is there yet like Security, Monitoring, Reporting, SQL, Canvas, APM, Logs UI, Infra UI and what is coming next ...
It includes all recent releases anytime a new version is published.
I recommend looking at this for possible inspiration. I use that, heavily modified, to bulk index AWS "stuff" (e.g. cloudtrail) into my Elastic ES.
You should also consider doing cloudtrail->S3->S3 Event->SQS->Lambda and researching and designing for all the possible failure modes. ES will sometimes give you errors during indexing and if you don't handle retries and make the whole design with "guarantee at least once delivery", you'll loose events because of back-pressure, downtime/maintenance, mapping errors, etc. Lambda will only retry 3 times if the execution throws an exception which easily lead to data loss.
If you have enough activity in the AWS account you will find discrepancies between the number of cloudtrail events and the number of events that made it into ES.
I have also used bulk requests in my lambda to ship many events per request toward ES, your current code does 1 event per request which is slower and more wasteful, just a thing to consider depending on your volume. Your lambda could process events at a rate lower than the incoming rate, which would lead to very high concurrency for your lambda or lost events if you limited its concurrency.
I can also tell you that cloudtrail events produces collisions in the index mapping, because their schema is not consistent enough. Some event have differing types for the same field name, such events can't be indexed by ES and are lost if you don't handle them.
Thank you for the link. I will have a look at it now and see how can I improve my current code.
Currently I use this setting for sending cloudtrail logs to ES:
Cloudtrail -> S3 -> S3 trigger -> Lambda -> Redis -> logstash -> Elasticsearch.
The reason behind this is that we ahve two endpoints at the moment and we need to send them to both. With redis I am not afraid that logs will be lost.
I agree that I need to consider error handling. I occasionally get KeyError for ['Records'] but I am not sure how to handle this yet. I also noticed some events not appearing in my ES because of this.
Maybe not useful for you as you use elastic on Amazon Service and you stuck with 6.5 version [functionbeat is on beta for this version, after I don't know how the Amazon version diverge from the official version].
But just in case if somebody else come to this message, there's a more easy way to go with functionbeat. https://www.elastic.co/guide/en/beats/functionbeat/current/functionbeat-overview.html
Or you can check about how they did and copy the functionbeat code
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.