Challenges with parsing AWS EKS Logs - Custom Cloudwatch Integration

I'm working on parsing AWS EKS logs that are ingested into Elastic via custom Cloud watch integration + filebeat. The problem though is that right now, the entire Kubernetes audit log is captured in the message field and it contains complex deeply nested fields, and I don't think I have the knowledge / skill at the point to write a reliable processor myself. I was wondering if anyone had any ideas or strategies to do deal with this situation or if there were any special purpose tools or existing processors / pipelines that may be able to help.

Thanks!!!

Hi @BKgingersnap Welcome to the community.

Always include the version you are on please.

Perhaps take a look at this.

Agent Version is 8.16.1

Ingest PIpelines:
logs-aws.cloudwatch_logs-2.24.0
logs-aws.cloudwatch_logs@custom

Mappings:
logs-aws.cloudwatch_logs@package

Integration Policy:
aws-usw2-cloudwatch-eks-prod-2 v2.24.0

Thanks for the article, will read through and see if this helps.

So I was able to follow the steps here: Bringing Your Cloud-Managed Kubernetes Audit Logs into Elasticsearch — Elastic Observability Labs

To parse EKS logs delivered via Cloudwatch to Kubernetes index

Now I need to do the same thing with Azure Kubernetes Logs

Currently AKS logs are delivered via filebeat on data_stream.dataset: azure.activitylogs

The AKS logs are broadly the same as the EKS logs except on the azure.activitylogs data stream the field containing the K8s audit logs is a flattened field vs a text field. So when I try to use the steps in the article I get an error. What would be the approach to deal with this?

How does the log looks like? Can you share some example?

As @leandrojmp said you are going to need to share detailed specifics ... "Broadly" matches is not going to get you there :slight_smile:

And How exactly are you shipping the AKS Audit logs? Through Azure Diagnostics and Event hub?

Please share your ingestion path and share you filebeat configuration...

Here is an example of the difference

EKS Message log -
Text / string data type with all logs contained with Json block . Parsed easily using Kubernetes default mapping

{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"Metadata","auditID":"<redacted>","stage":"ResponseComplete","requestURI":"<redacted>","verb":"get","user":{"username":<redacted>,"uid":"<redacted>","groups":["system:serviceaccounts","system:serviceaccounts:default","system:authenticated"],"extra":{"authentication.kubernetes.io/pod-name":["<redacted>"],"authentication.kubernetes.io/pod-uid":["<redacted>"]}},"sourceIPs":["<redacted>"],"userAgent":"<redacted>","objectRef":{"resource":"gateways","namespace":"<redacted>","name":"<redacted>","apiGroup":"<redacted>","apiVersion":"<redacted>"},"responseStatus":{"metadata":{},"code":200},"requestReceivedTimestamp":"2025-03-04T06:22:18.819232Z","stageTimestamp":"2025-03-04T06:22:18.822532Z","annotations":{"authorization.k8s.io/decision":"allow","<redacted>" of ClusterRole \"<redacted>" to ServiceAccount \"<redacted>""}}

AKS azure.activitylogs.properties

  • Flattened field with "pod" and "containerID" fields before, Audit log Json block, then "stream" field after.
{
  "pod": "<redacted>",
  "containerID": "<redacted>",
  "log": "{\"kind\":\"Event\",\"apiVersion\":\"audit.k8s.io/v1\",\"level\":\"Metadata\",\"auditID\":\"<redacted>\",\"stage\":\"ResponseComplete\",\"requestURI\":\"<redacted>",\"verb\":\"update\",\"user\":{\"username\":\"<redacted>",\"groups\":[\"system:masters\",\"system:authenticated\"]},\"sourceIPs\":[\"<redacted>"],\"userAgent\":\"<redacted>",\"objectRef\":{\"resource\":\"leases\",\"namespace\":\"kube-system\",\"name\":\"<redacted>",\"uid\":\"<redacted>",\"apiGroup\":\"coordination.k8s.io\",\"apiVersion\":\"v1\",\"resourceVersion\":\"<redacted>"},\"responseStatus\":{\"metadata\":{},\"code\":200},\"requestReceivedTimestamp\":\"2025-03-04T06:22:18.599549Z\",\"stageTimestamp\":\"2025-03-04T06:22:18.604251Z\",\"annotations\":{\"authorization.k8s.io/decision\":\"allow\",\"authorization.k8s.io/reason\":\"\"}}\n",
  "stream": "<redacted>"
}

I was able to map the AKS logs to the the Kubernetes default mapping by doing the following:

Run the following pipeline which converts flattened field - "azure.activitylogs.properties" to text, run gsub to remove "{pod= to log=", run gsub to remove, ", stream=stdout, containerID=.*",

And then run the 2 pipelines in the article posted above (changing names as needed)

[
  {
    "convert": {
      "field": "azure.activitylogs.properties",
      "type": "string",
      "target_field": "message"
    }
  },
  {
    "gsub": {
      "field": "message",
      "pattern": "\\{pod=.*?log=",
      "replacement": ""
    }
  },
  {
    "gsub": {
      "field": "message",
      "pattern": ", stream=stdout, containerID=.*",
      "replacement": ""
    }
  }
]
1 Like