Filebeat version > 7.13.X did not correctly process logs from AWS S3 Bucket

Hi All!

I work with filebeat to push logs from AWS RDS databases to a local instance of Elasticsearch.
To do this, I configured AWS RDS to publish logs to specific Cloudwatch Log Group/Log Stream, and use AWS Kinesis Firehose to forward logs (that are compressed in .gz files) to a particular Bucket S3. After I configured an AWS SQS queue to be triggered by every event of Object Creation on the bucket.
Until the filebeat version 7.13.X all works fine but after version 7.14 I can see the event processed by filebeat, but it seems that, once the .gz file is downloaded and processed, no logs are generated in output.
I build a simple lab to study the behavior, simplifying it with an "output to file" section. Please, below the configuration.

name: filebeat-test

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: true

filebeat.inputs:
- type: aws-s3
  queue_url: ${AWS_SQS_QUEUE_URL}
  access_key_id: ${AWS_ACCESS_KEY}
  secret_access_key: ${AWS_SECRET_KEY}
  expand_event_list_from_field: Records

output.file:
  path: "/tmp/filebeat"
  filename: filebeat.log
#================================ Processors =====================================

# Configure processors to enhance or manipulate events generated by the beat.

logging.level: debug
logging.to_files: false

The logs that I obtain with version 8.4.3 are:

filebeat-test  | {"log.level":"debug","@timestamp":"2022-10-27T15:38:46.576Z","log.logger":"input.aws-s3.sqs_s3_event","log.origin":{"file.name":"awss3/sqs_s3_event.go","file.line":277},"message":"SQS message contained 1 S3 event notifications.","service.name":"filebeat","id":"A0E1FA1D7D637307","queue_url":"BLURRED","message_id":"d9f51057-e4c1-4514-b8e9-fe315cb9ef59","message_receipt_time":"2022-10-27T15:38:46.576Z","ecs.version":"1.6.0"}
filebeat-test  | {"log.level":"debug","@timestamp":"2022-10-27T15:38:46.577Z","log.logger":"input.aws-s3.sqs_s3_event","log.origin":{"file.name":"awss3/s3_objects.go","file.line":124},"message":"Begin S3 object processing.","service.name":"filebeat","id":"A0E1FA1D7D637307","queue_url":"BLURRED","message_id":"d9f51057-e4c1-4514-b8e9-fe315cb9ef59","message_receipt_time":"2022-10-27T15:38:46.576Z","bucket_arn":"dev-blurred-database-gz-bucket","object_key":"2022/10/27/13/BLURRED-1-2022-10-27-13-28-36-fa320c76-a789-4ee1-9836-b5620cf3e699.gz","ecs.version":"1.6.0"}
filebeat-test  | {"log.level":"debug","@timestamp":"2022-10-27T15:38:46.616Z","log.logger":"input.aws-s3.sqs_s3_event","log.origin":{"file.name":"awss3/s3_objects.go","file.line":132},"message":"End S3 object processing.","service.name":"filebeat","id":"A0E1FA1D7D637307","queue_url":"BLURRED","message_id":"2a7c40b2-7a52-424b-94d6-6490d40dac4c","message_receipt_time":"2022-10-27T15:38:45.925Z","bucket_arn":"dev-blurred-database-gz-bucket","object_key":"2022/10/27/13/BLURRED-1-2022-10-27-13-53-51-2f34ba6e-6972-4989-9c17-84dc2b70502d.gz","elapsed_time_ns":689591300,"ecs.version":"1.6.0"}
filebeat-test  | {"log.level":"debug","@timestamp":"2022-10-27T15:38:46.616Z","log.logger":"input.aws-s3.sqs_s3_event","log.origin":{"file.name":"awss3/sqs_s3_event.go","file.line":299},"message":"End processing SQS S3 event notifications.","service.name":"filebeat","id":"A0E1FA1D7D637307","queue_url":"BLURRED","message_id":"2a7c40b2-7a52-424b-94d6-6490d40dac4c","message_receipt_time":"2022-10-27T15:38:45.925Z","ecs.version":"1.6.0"}

Instead, with version 7.13.1 all logs are written to the filebeat.log file as well.
I did not figure out what is the issue.
Could you please help me to understand?
Thanks in advance.
A.

Up!!

I found a workaround: use a Lambda function that downloads locally .gz file, extracts the content, adds a "newline" character at the end of every json object, and finally repack the .gz file, saves the fixed gz file to the bucket S3, deleting the unfixed one. It works but I more expensive and I don't like it from the point of view of compliance, because the processing performed by Lambda function can be considered as "log tampering" by regulatory entities.

Here is the draft of the Lambda function:

import io
import boto3
import urllib.parse
import os
import gzip
import json
import logging
import re


def parse_json_obj_from_string(string):
    dec = json.JSONDecoder()
    pos = 0
    out_str = ""
    while not pos == len(str(string)):
        try:
            j, json_len = dec.raw_decode(str(string)[pos:])
        except Exception as e:
            logging.error(e)
            json_len = 1
        pos += json_len
        out_str = out_str + str(j) + "\n"
    return out_str


def repack_fixed_gz_file(string, gzFileName):
    tmp_file = "/tmp/tmp.txt"
    with open(tmp_file, "w") as f1:
        f1.write(string)
    with open(tmp_file, "rb") as f1:
        gz_file = gzip.open(gzFileName, "wb")
        gz_file.writelines(f1)
        gz_file.close()
    os.remove(tmp_file)

def lambda_handler(event, context):
    TARGET_BUCKET = os.getenv("TARGET_BUCKET")

    # Get incoming bucket and key
    source_bucket = event['Records'][0]['s3']['bucket']['name']
    source_key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])

    # Copy object to different bucket
    s3_resource = boto3.resource('s3')

    try:
        source_key_filename = re.search("[^\/]+$", source_key).group()
        source_key_path = re.sub("[^\/]+$", "", source_key)
    except Exception as e:
        logging.error(e)
        raise e

    try:
        # Connect to s3 service
        s3_client = boto3.client("s3")
        # Get the file from S3
        response = s3_client.get_object(Bucket=source_bucket, Key=source_key)
        # Read the content of the file
        content = response['Body'].read()
        #Extract the content from gz file and fix it adding a newling at the end of each json object
        with gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb') as fh:
            fixed_content = parse_json_obj_from_string(fh.read().decode("utf-8"))
        # Add the prefix and the right extension
        fixed_filename = "fixed_" + source_key_filename + ".gz"
        # Repach the GZ file
        repack_fixed_gz_file(fixed_content, "/tmp/" + fixed_filename)
    except Exception as e:
        logging.error(e)
        raise e

    try:
        # Copy the fixed file to the target bucket
        response1 = s3_resource.Bucket(TARGET_BUCKET).upload_file("/tmp/" + fixed_filename, source_key_path + fixed_filename)
        # Delete the original file
        response2 = s3_resource.Bucket(source_bucket).Object(source_key).delete()
        os.remove("/tmp/" + fixed_filename)

        logging.info("CONTENT TYPE 1: " + response1['ContentType'])
        logging.info("CONTENT TYPE 2: " + response2['ContentType'])
        return {
            "statusCode": 200,
            "body": json.dumps({
                "response1": response1['ContentType'],
                "response2": response2['ContentType'],
            })
        }
    except Exception as e:
        logging.error(e)
        logging.error('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(source_key, source_bucket))
        raise e

I hope this is useful to someone. Anyway, I'm interested in a more integrated solution.
Best regards.
M.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.