I have huge amount of logs coming from Cisco ASA firewall specially firewall deny. for example i have more than 10 messages from same source IP accessing same destination with same destination port. So the idea is , instead sending 10 different messages to elastic, i want to send one aggregated message with the additional field of total_hits.
my original message is like below (multiple within short period of time, say 10 event per second)
04 10 2020 16:00:34 10.1.254.26 <LOC6:INFO> Apr 10 16:00:34 host1.abc.com %ASA-6-106100: access-list GLOBAL_ACCESS_IN denied tcp OUTSIDE/1.1.1.1(51595) -> INSIDE/2.2.2.2(873) hit-cnt 1 first hit [0x93138c00, 0x00000000]
here is my logstash config file
input {
udp {
host => "10.0.2.15"
port => 514
type => "syslog"
}
}
filter {
grok {
match => {"message" => " %{DATA:Timestamp} %{IP:Firewall_IP} %{DATA:loglevel} %{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{DATA:FW} %{WORD:policy_id} %{CISCO_ACTION:action} %{WORD:protocol} %{DATA:src_interface}\/%{IP:src_ip}\(%{INT:src_port}\) %{DATA} %{DATA:dst_interface}\/%{IP:dst_ip}\(%{INT:dst_port}\) %{DATA} %{INT:hit_count} %{CISCO_INTERVAL:interval} \[%{DATA:hashcode1}, %{DATA:hashcode2}\]"}
}
mutate {
remove_field => ["@timestamp","@version","path","host"]
}
aggregate {
task_id => "%{src_ip}_%{dst_ip}_%{dst_port}"
code => "map['total_hits'] ||= 0; map['total_hits'] += 1;"
map_action => "create_or_update"
push_map_as_event_on_timeout => true
timeout_task_id_field => "src_ip"
timeout => 300
inactivity_timeout => 30
}
}
output {
stdout { codec => "rubydebug" }
}
Above codes added below line at the end of every message
{
"@version" => "1",
"total_hits" => 4,
"@timestamp" => 2020-04-12T13:01:01.942Z,
"src_ip" => "1.1.1.1_2.2.2.2_333"
}
However , what i wants to achieve to add new field called total_hit at the end of the original message.
any help ?