How to add aggrigate map field to the original message as a additional field

I have huge amount of logs coming from Cisco ASA firewall specially firewall deny. for example i have more than 10 messages from same source IP accessing same destination with same destination port. So the idea is , instead sending 10 different messages to elastic, i want to send one aggregated message with the additional field of total_hits.

my original message is like below (multiple within short period of time, say 10 event per second)

04 10 2020 16:00:34 10.1.254.26 <LOC6:INFO> Apr 10 16:00:34 host1.abc.com %ASA-6-106100: access-list GLOBAL_ACCESS_IN denied tcp OUTSIDE/1.1.1.1(51595) -> INSIDE/2.2.2.2(873) hit-cnt 1 first hit [0x93138c00, 0x00000000]

here is my logstash config file

input {
  udp {
     host => "10.0.2.15"
     port => 514 
     type => "syslog"
    }
}

filter {
     grok {
      match => {"message" => " %{DATA:Timestamp} %{IP:Firewall_IP} %{DATA:loglevel} %{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{DATA:FW} %{WORD:policy_id} %{CISCO_ACTION:action} %{WORD:protocol} %{DATA:src_interface}\/%{IP:src_ip}\(%{INT:src_port}\) %{DATA} %{DATA:dst_interface}\/%{IP:dst_ip}\(%{INT:dst_port}\) %{DATA} %{INT:hit_count} %{CISCO_INTERVAL:interval} \[%{DATA:hashcode1}, %{DATA:hashcode2}\]"}
    }
    mutate {
	remove_field => ["@timestamp","@version","path","host"]
	}
aggregate {
   task_id => "%{src_ip}_%{dst_ip}_%{dst_port}"
   code => "map['total_hits'] ||= 0; map['total_hits'] += 1;"
   map_action => "create_or_update"
   push_map_as_event_on_timeout => true
   timeout_task_id_field => "src_ip"
   timeout => 300
   inactivity_timeout => 30
 }
}

output {
  stdout { codec => "rubydebug" }
 
}

Above codes added below line at the end of every message

 {
          "@version" => "1",
        "total_hits" => 4,
        "@timestamp" => 2020-04-12T13:01:01.942Z,
            "src_ip" => "1.1.1.1_2.2.2.2_333"
    }

However , what i wants to achieve to add new field called total_hit at the end of the original message.

any help ?

2 Likes

This creates a new event containing whatever you have inserted into the map. Exactly what you are seeing.

You could have the aggregate filter append a field to every event, but it would only be the number of hits it has seen so far, not the total number of hits. You cannot expect the filter to look into the future.

@Badger Thanks for the reply.

But my requirement is to append the map_field (total-hits) to the original message,

Also my assumption over inactivity_timeout => 30 will be provide me the total aggregation during the 30 second. Correct me if I'm wrong.

You might be able to do that by using aggregate and then using the output of the aggregate to create upserts to each document that has previously been inserted into elasticsearch.

@Badger Thanks for the clue. I was able to change the code as below and achieve the objective.

input {
    
    udp {
         host => "10.0.2.15"
         port => 514
         type => "syslog"
        }
    }
    filter {
         grok {
          match => {"message" => " %{DATA:Timestamp} %{IP:Firewall_IP} %{DATA:loglevel} %{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{DATA:FW} %{WORD:policy_id} %{CISCO_ACTION:action} %{WORD:protocol} %{DATA:src_interface}\/%{IP:src_ip}\(%{INT:src_port}\) %{DATA} %{DATA:dst_interface}\/%{IP:dst_ip}\(%{INT:dst_port}\) %{DATA} %{INT:hit_count} %{CISCO_INTERVAL:interval} \[%{DATA:hashcode1}, %{DATA:hashcode2}\]"}
        }
        mutate {
    	remove_field => ["@timestamp","@version","path","host"]
    	}
    aggregate {
       task_id => "%{src_ip}_%{dst_ip}_%{dst_port}"
       code => "map['total_hits'] ||= 0; map['total_hits'] += 1;
    	    map['message'] ||= event.get('message');
    	    map['status'] ||= 'Final_Message'
    	   "
       map_action => "create_or_update"
       push_map_as_event_on_timeout => true
       timeout_task_id_field => "taskID"
       timeout => 30
       inactivity_timeout => 3
    }
    mutate {
    	remove_field => ["@timestamp","@version","taskID"]
    	
    	}
    }
    output {
    	if [status] == "Final_Message"
    	{
    	stdout { codec => "rubydebug" }
    	}
    }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.