I'm using input type AWS-S3 to fetch S3 objects, and I'm getting them from a SQS notification.
These objects are log group streams from CloudWatch, which are logs from a Lambda function.
The problem is that I'm getting almost the entire log under a single field named "message" in Kibana.
Looking at the Filebeat logs, I can see that the error.message is "parsing input as JSON: multiple json elements found", and error.field is "message".
And I can see why that is when looking at my S3 objects: They contain some info from SQS, and also the whole log stream from Cloudwatch, which means there's multiple "message" objects, accross multiple levels of the json.
How should a Processor look like to handle such case?
Here's an example of how my S3 object looks like:
{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "/aws/lambda/myapp-adapter",
"logStream": "2024/03/14/[$LATEST]STREAM",
"subscriptionFilters": [
"Subscription Filter"
],
"logEvents": [
{
"id": "some-id",
"timestamp": 1710431203354,
"message": "{\"level\":\"INFO\",\"message\":\"Refreshing Oauth tokens\",\"service\":\"myappPublicApp\",\"timestamp\":\"2024-03-14T15:46:43.354Z\",\"xray_trace_id\":\"xray-id\",\"adapter\":\"myapp\",\"class\":\"myappApiClient\",\"userId\":1,\"eventType\":\"myapp Oauth\"}\n"
},
{
"id": "some-id",
"timestamp": 1710431203913,
"message": "{\"level\":\"ERROR\",\"message\":\"Failed to refresh access token\",\"service\":\"myappPublicApp\",\"timestamp\":\"2024-03-14T15:46:43.896Z\",\"xray_trace_id\":\"xray-id\",\"class\":\"getmyappClient\",\"userId\":1,\"eventType\":\"myapp Oauth\",\"error\":{\"message\":\"Request failed with status code 400\",\"name\":\"AxiosError\",\"stack\":\"AxiosError: Request failed with status code 400\\n"
},
{
"id": "some-id",
"timestamp": 1710431203914,
"message": "2024-03-14T15:46:43.914Z\tSOME-UUID\tERROR\tInvoke Error \t{\"errorType\":\"Error\",\"errorMessage\":\"\",\"stack\":[\"Error\",\" at /var/task/bundle/myappAdapter.js:679:15\",\" at Generator.throw (<anonymous>)\",\" at rejected (/var/task/bundle/myappAdapter.js:629:65)\",\" at process.processTicksAndRejections (node:internal/process/task_queues:95:5)\"]}\n"
}
]
}
As you can see, there's multiple of "message" element as they're part of an array.
Here's the two Filebeat logs I got when it tried to process the above file:
{
"log.level":"debug",
"@timestamp":"2024-03-15T08:08:39.641Z",
"log.logger":"truncate_fields",
"log.origin": {
"function":"github.com/elastic/beats/v7/libbeat/processors/actions.(*decodeJSONFields).Run",
"file.name":"actions/decode_json_fields.go",
"file.line":123
},
"message":"Error trying to unmarshal <Whole S3 file content, same as above>",
"service.name":"filebeat",
"ecs.version":"1.6.0"
}
{
"log.level":"debug"
"@timestamp":"2024-03-15T08:08:39.642Z",
"log.logger":"processors",
"log.origin":{
"function":"github.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Run",
"file.name":"processing/processors.go",
"file.line":136
},
"message":"Fail to apply processor global{decode_json_fields=message}: multiple json elements found",
"service.name":"filebeat",
"ecs.version":"1.6.0"
}
Finally, this is how I've set up my processor:
filebeat.inputs:
- type: aws-s3
queue_url: ""
json.keys_under_root: true
processors:
- decode_json_fields:
fields: ["message"]
process_array: true
target: ""
overwrite_keys: true
add_error_key: true