Hi All,
We are receiving logs in a Kafka topic in the following format and need to process them using a Logstash filter to send them to Elasticsearch. The requirement is to ignore the first type of log entirely and to separate each payload in the second type of log as an individual log entry.
{"@timestamp":"2024-12-18T18:06:59.324Z","@metadata":{"beat":"metricbeat","type":"_doc","version":"8.15.2","raw_index":"metrics-elastic_agent.elastic_agent-default","input_id":"metrics-monitoring-agent","stream_id":"metrics-monitoring-agent"},"agent":{"id":"52887bbd-0645-4c47-8e63-4095f2705fa8","ephemeral_id":"6d4858fd-5683-4c04-a670-5a29272b129","name":"di.europe.e.local","type":"metricbeat","version":"8.15.2"},"system":{"process":{"cpu":{"system":{"ticks":80,"time":{"ms":80}},"total":{"ticks":130,"time":{"ms":130},"value":130},"user":{"ticks":1150,"time":{"ms":1150}}},"memory":{"size":5.258164e+07},"fd":{"open":27,"limit":{"soft":4096,"hard":4096}}}},"service":{"address":"http://localhost:6791/stats","type":"http"},"elastic_agent":{"version":"8.15.2","process":"elastic-agent","id":"52887bbd-0645-4c47-8e63-4095f2705fa8","snapshot":false},"component":{"binary":"elastic-agent","id":"elastic-agent"},"data_stream":{"type":"metrics","dataset":"elastic_agent.elastic_agent","namespace":"default"},"ecs":{"version":"8.0.0"},"host":{"name":"di.europe.e.local","ip":["10.178.2.e1","fe80::477:70ff:fefe:3fdb"],"mac":["06-77-70-FE-3F-DB"],"hostname":"di.europe.e.local","architecture":"x86_64","os":{"kernel":"3.10.0-1160.118.1.el7.x86_64","codename":"Maipo","type":"linux","platform":"rhel","version":"7.7 (Maipo)","family":"redhat","name":"Red Hat Enterprise Linux Server"},"id":"69f417b1a15a420eb9acfd36802ff697","containerized":false},"cloud":{"machine":{"type":"c5.xlarge"},"instance":{"name":"ip-10-178-2-1.europe.e.local","id":"i-0528118076d76fe10"},"availability_zone":"eu-west-1a","service":{"name":"Nova"},"provider":"openstack"},"event":{"dataset":"elastic_agent.elastic_agent","module":"http","duration":3075576},"metricset":{"name":"json","period":60000}}
{"@timestamp":"2024-12-18T18:07:00.331Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.15.2","raw_index":"logs-httpjson.generic-default","input_id":"generic-httpjson","stream_id":"httpjson-httpjson.generic"},"cloud":{"provider":"openstack","machine":{"type":"c5.xlarge"},"instance":{"name":"ip-10-178-2-1.europe.e.local","id":"i-0528118076d76fe10"},"availability_zone":"eu-west-1a","service":{"name":"Nova"}},"event":{"created":"2024-1-18T18:07:00.331Z","dataset":"httpjson.generic"},"input":{"type":"httpjson"},"data_stream":{"dataset":"httpjson.generic","namespace":"default","type":"logs"},"elastic_agent":{"id":"52887bbd-0645-4c47-8e63-4095f2705fa8","snapshot":false,"version":"8.15.2"},"agent":{"id":"52887bbd-0645-4c47-8e63-4095f2705fa8","version":"8.15.2","ephemeral_id":"9999755c-b2b1-4939-be85-37734733f1a8","name":"di.europe.e.local","type":"filebeat"},"ecs":{"version":"8.0.0"},"message":"{\"pagedResultsCookie\":\"eyJfc29ydEtleXMiOm51bGwsIl9wYWdlZFJlc3VsdHNDb29raWUiOiJjbzhIQWZRdWNQZ3dPTEpYSWN3NW5Nb2ZtUlFiNldFY1dDNXdUZExSS2NqSklPbUotQThnQVM1bFJ0RUJHZjBXZVRPdEE3WEg3NFpZdTBfa3VYWEwwN0ZTT0pvVXRVTzIwZG5YU2tLc0c3WnZMdi1YTU9QNmxKVVA0aVF4R0ZfWXpLYjRzWG16NUtQbGNuRW9ERkRkZHRJUm1VTkRhY1lHcE5wM2RMdkUtejNtRDIwNHlYelFTRkhhbU5iR1E3c1NpOFM1N1hpUTkxeVFoMFJhdEtOaTJKOUkxemFka3JHM1liV05fb1VTbmEyQ2lUWXJ6NUNRQXdtNDBRQUpldnVCVGl3MVo1NkdSMGozZlFxWDl3NFJYNlVibzViaUdfMXg0SW9mWDJzeHoxazVRSUZqMXVoTzZkMFFFUEE5M19mbUhybnpxVlVRVnl2WnVfWHhEaXdaU1d6RXNFQzFvOVM5WEFqREs2UWgxZTlLYlRIaHdmQWdwZGFORWFkX0NENDZGZU8yZkFiYW5hbHp1OUVaNF9nWFFSWjR5UUZpVkt0YmRqbXRBd3I0LXRQWjM2N0RaSTFfU0xodTUwbVZiNDZZMEMwLXZVMk13aS05dmlURmRoNTU2cldRN1lYT3dhS0p2cnN0UFhEcWNwRWZVaGRHU1hnY3psbHZOQXlvYWRVR2hSZjJadW4xWmJSVlpFbzFNMG9Jb1dYbzQ4c19lUW11Z21xVC0yaVFXWUJhcWRNYUZ3MGlNQTU2RnN5NkozSWlQeTZEVTI0RnVpTm9sY2FuSlVQU0RTVGlWTXh5blpnNy1hYXVBQTFBX0xVSzZuS2NGMHBxazZtTWV0WEhwRGxkOHpneWlpb0s3MkczUThhSUlubm1vdlR2ZC1VRFZzOXlhTmw0Q1NjTU5KamtFeWRyNDlod1dHUldSNXVEVDlid0dCWVZOVW9XOW5mcjhyc19mWTN2bkFiQWZ5Q0FiSVR0UW5IdHpfNTlZeUlwallpdWtINmVQN1cySlQ0aUY5UkZURVhxTlI5cHJ3Qjk4b2pLdy1maDZsVTl6TW10d2swTF9xams0amF6eHE5UmxZQ1k2THhuQjJGV2J1LWI4d1RIWklkTWw4dTV1VU5uVnRGUFZYMWtwZHpRcXhLQU9Wb2hYanlzaGJJaHpZXy1MZDhXVnBONmczd1lCOWpJT2NyS3ZyVnRiN2lhQnpOVUNzSTVjTTJmcnJNeThGVHRCTDhjZTBRVV80a0NWLTlsY3k0MG1aa3VHZ0ZlZWo4dG9rdHFIQmpPTE1WYXVaUjdoYmUxbm1uNThfLXlOcXA3WnlxMmNjZHEzUi1CS3RqT01ySmNsS1NVVWMydUF6VzhiODlCQ0FjU1ZSdDg4b2pNVG5nTU5Tc0QwVzh4cXhGd3pXZlZYd1dud3BtZnd2LWVUT2plbGRuVHk3RzE3dTZhclRRdlc3TzhVRVJyN1dHaE5nZzFlUW05SFFLbFJENzVCLUVoYXZTY1QyakM1MVl3UHFvTVk1OEhpTnZOVUJfQ2ZabUVseUMzTVhLUDdqdkptRFhlc0UwRHZhazJmLW43VEpYSW5pNXNfOVJSME1RaUI2VENDYUtIc1NWQlF6Y0dlMjZFWWdrIiwiX3BhZ2VkUmVzb3VyY2VzT2Zmc2V0IjoxMDAwLCJfcGFnZVNpemUiOjEwMDAsImFyZ3MiOnsiYmVnaW5UaW1lIjpbIjIwMjQtMTItMTdUMTg6MDY6NTlaIl0sImVuZFRpbWUiOlsiMjAyNC0xMi0xOFQxODowNjo1OVoiXSwic291cmNlIjpbImFtLWV2ZXJ5dGhpbmciXX19\",\"remainingPagedResults\":-1,\"result\":[{\"payload\":{\"_id\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45796\",\"client\":{\"ip\":\"10.1.1.44\",\"port\":50158},\"component\":\"OAuth\",\"eventName\":\"AM-ACCESS-ATTEMPT\",\"http\":{\"request\":{\"headers\":{\"content-type\":[\"application/x-www-form-urlencoded\"],\"host\":[\"am.fr-platform\"],\"user-agent\":[\"Go-http-client/1.1\"]},\"method\":\"POST\",\"path\":\"https://am.fr-platform/am/oauth2/realms/root/introspect\",\"secure\":true}},\"level\":\"INFO\",\"realm\":\"/\",\"request\":{\"detail\":{}},\"source\":\"audit\",\"timestamp\":\"2024-1-17T18:07:22.625Z\",\"topic\":\"access\",\"transactionId\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45792\"},\"source\":\"am-access\",\"timestamp\":\"2024-1-17T18:07:22.625887502Z\",\"type\":\"application/json\"},{\"payload\":{\"_id\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45799\",\"component\":\"Authentication\",\"entries\":[{\"info\":{\"authControlFlag\":\"REQUIRED\",\"authIndex\":\"module_instance\",\"authLevel\":\"0\",\"ipAddress\":\"10.1.1.44\",\"moduleClass\":\"Application\"},\"moduleId\":\"Application\"}],\"eventName\":\"AM-LOGIN-MODULE-COMPLETED\",\"level\":\"INFO\",\"principal\":[\"org-environment-resource-server\"],\"realm\":\"/\",\"result\":\"SUCCESSFUL\",\"source\":\"audit\",\"timestamp\":\"2024-1-17T18:07:22.627Z\",\"topic\":\"authentication\",\"trackingIds\":[\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45797\"],\"transactionId\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45792\"},\"source\":\"am-authentication\",\"timestamp\":\"2024-1-17T18:07:22.627418513Z\",\"type\":\"application/json\"},{\"payload\":{\"_id\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45801\",\"component\":\"Authentication\",\"entries\":[{\"info\":{\"authIndex\":\"module_instance\",\"authIndexValue\":\"Application\",\"authLevel\":\"0\",\"ipAddress\":\"10.1.1.44\"},\"moduleId\":\"Application\"}],\"eventName\":\"AM-LOGIN-COMPLETED\",\"level\":\"INFO\",\"principal\":[\"org-environment-resource-server\"],\"realm\":\"/\",\"result\":\"SUCCESSFUL\",\"source\":\"audit\",\"timestamp\":\"2024-1-17T18:07:22.627Z\",\"topic\":\"authentication\",\"trackingIds\":[\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45797\"],\"transactionId\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45792\",\"userId\":\"id=org-environment-resource-server,ou=agent,ou=am-config\"},\"source\":\"am-authentication\",\"timestamp\":\"2024-1-17T18:07:22.628067715Z\",\"type\":\"application/json\"},{\"payload\":{\"_id\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45803\",\"client\":{\"ip\":\"10.1.1.44\",\"port\":50158},\"component\":\"OAuth\",\"eventName\":\"AM-ACCESS-OUTCOME\",\"http\":{\"request\":{\"headers\":{\"content-type\":[\"application/x-www-form-urlencoded\"],\"host\":[\"am.fr-platform\"],\"user-agent\":[\"Go-http-client/1.1\"]},\"method\":\"POST\",\"path\":\"https://am.fr-platform/am/oauth2/realms/root/introspect\",\"secure\":true}},\"level\":\"INFO\",\"realm\":\"/\",\"request\":{\"detail\":{\"resourceOwnerId\":\"id=org-engine-client,ou=agentonly,ou=am-config\"}},\"response\":{\"detail\":{\"active\":true,\"client_id\":\"org-engine-client\",\"scope\":\"fr:idc:p1connect:read fr:idc:custom-domain:* fr:idc:federation:* fr:idc:sso-cookie:* fr:idc:content-security-policy:* fr:idc:p1connect:delete fr:idc:certificate:* fr:idc:cookie-domain:*\",\"token_type\":\"Bearer\",\"username\":\"org-engine-client\"},\"elapsedTime\":13,\"elapsedTimeUnits\":\"MILLISECONDS\",\"status\":\"SUCCESSFUL\",\"statusCode\":\"200\"},\"source\":\"audit\",\"timestamp\":\"2024-1-17T18:07:22.638Z\",\"topic\":\"access\",\"trackingIds\":[\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45797\"],\"transactionId\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45792\",\"userId\":\"id=org-environment-resource-server,ou=agent,ou=am-config\"},\"source\":\"am-access\",\"timestamp\":\"2024-1-17T18:07:22.638583229Z\",\"type\":\"application/json\"},{\"payload\":{\"_id\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45808\",\"client\":{\"ip\":\"10.1.1.44\",\"port\":50158},\"component\":\"OAuth\",\"eventName\":\"AM-ACCESS-ATTEMPT\",\"http\":{\"request\":{\"headers\":{\"content-type\":[\"application/x-www-form-urlencoded\"],\"host\":[\"am.fr-platform\"],\"user-agent\":[\"Go-http-client/1.1\"]},\"method\":\"POST\",\"path\":\"https://am.fr-platform/am/oauth2/realms/root/introspect\",\"secure\":true}},\"level\":\"INFO\",\"realm\":\"/\",\"request\":{\"detail\":{}},\"source\":\"audit\",\"timestamp\":\"2024-1-17T18:07:22.716Z\",\"topic\":\"access\",\"transactionId\":\"8a962dd0-c6b8-46d5-8b66-e2b0daf2e4de-45804\"},\"source\":\"am-access\",\"timestamp\":\"2024-1-17T18:07:22.717596682Z\",\"type\":\"application/json\"}
However, when I try using filters, the log count remains fixed at 1, and the existing log keeps getting overwritten. While the log size increases, the log count does not change.
Could anyone suggest a filter configuration that ensures each payload result is treated as a separate log entry with a unique fingerprint or UUID? Below is the filter I am currently using:
filter {
json {
source => "message"
target => "parsed_message"
}
split {
field => "[parsed_message][result]"
}
json {
source => "[parsed_message][result][payload]"
target => "parsed_payload"
}
mutate {
rename => {
"[parsed_payload][_id]" => "log_id"
"[parsed_payload][timestamp]" => "@timestamp"
"[parsed_payload][component]" => "component"
"[parsed_payload][eventName]" => "event_name"
}
remove_field => ["message", "parsed_message", "[parsed_message][result]"]
}
fingerprint {
source => ["log_id", "event_name", "transactionId"]
target => "[@metadata][fingerprint]"
method => "SHA1"
}
}