Need to aggregate two different sets of Cisco ASA logs. One - for a cert verification, other - for a firewall.
### TOKEN PRESENSE CHECK (717022 - cert validated THAN 722051 - IP assigned, # To match task_id user.name was added to 717022 during parsing from certificate and = %{cn}
# If IP assigned and cn not exists is_incident => true
if [ASA_NMBR] in ["717022", "722051"]
{
aggregate
{
task_id => "%{user.name}"
code => "
map['Count'] ||= 0
map['Count'] += 1
map['cn'] || map['cn'] = event.get('cn')
map['user.name'] || map['user.name'] = event.get('user.name')
map['source.ip'] || map['source.ip'] = event.get('source.ip')
map['@timestamp'] || map['@timestamp'] = event.get('@timestamp')
map['@log_type'] || map['@log_type'] = event.get('@log_type')
map['source.address'] || map['source.address'] = event.get('source.address')
map['@metadata.target_index'] || map['@metadata.target_index'] = event.get('@metadata.target_index')
map['msg'] ||= ''
if event.get('ASA_NMBR') == '722051' and map['cn'] != map['user.name']
map['is_incident'] = true
end
if ! event.get('msg').nil?
map['msg'] = map['msg'] + ' => ' + event.get('message')
end
"
push_map_as_event_on_timeout => true
timeout => 1500
inactivity_timeout => 1000
timeout_code =>
"
event.tag('aggregated')
event.set('asa-vpn_token-checked', true )
event.set('[@metadata][target_index]', 'p-cisco-asa')
"
}
}
else
{
### GENERIC AGGREGATION
aggregate {
task_id => "%{@log_source} %{ASA_NMBR} %{event.action} %{protocol} %{interface} %{source.ip} %{source.port} %{destination.ip} %{destination.port} %{user.name}"
code => "
map['hits'] = event.get('hit_count')
map['Count'] ||= 0
h = event.get('hit_count')
if h.nil? || h == 0
map['Count'] += 1
else
map['Count'] += h.to_i
end
map['Count'] += 1
map['protocol'] || map['protocol'] = event.get('protocol')
map['interface'] || map['interface'] = event.get('interface')
map['source.port'] || map['source.port'] = event.get('source.port')
map['source.address'] || map['source.address'] = event.get('source.address')
map['@metadata.target_index'] || map['@metadata.target_index'] = event.get('@metadata.target_index')
map['destination.port'] || map['destination.port'] = event.get('destination.port')
map['@log_source'] || map['@log_source'] = event.get('@log_source')
map['@timestamp'] || map['@timestamp'] = event.get('@timestamp')
map['@log_type'] || map['@log_type'] = event.get('@log_type')
map['@log_host'] || map['@log_host'] = event.get('@log_host')
map['user.name'] || map['user.name'] = event.get('user.name')
map['msg_plus'] || map['msg_plus'] = event.get('message')
map['ASA_NMBR'] || map['ASA_NMBR'] = event.get('ASA_NMBR')
map['event.action'] || map['event.action'] = event.get('event.action')
if event.get('source.ip') != nil
if event.get('source.ip').match(/^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/)
map['source.ip'] = event.get('source.ip')
else
map['source.ip_name'] = event.get('source.ip')
end
end
if event.get('destination.ip') != nil
if event.get('destination.ip').match(/^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/)
map['destination.ip'] = event.get('destination.ip')
else
map['destination.ip_name'] = event.get('destination.ip')
end
end
"
push_map_as_event_on_timeout => true
timeout => 10
inactivity_timeout => 5
timeout_code =>
"
event.tag('aggregated')
event.set('[@metadata][target_index]', 'p-cisco-asa')
"
}
}
When restarting logstash sometimes the error appears:
For task_id pattern %{user.name}, inactivity_timeout must be lower than timeout>
Is it possible to use more then one aggregate blocks?
If not how resolve the task?
The main idea to split different messages to one. For example a first message has user.name and cert name, the second - an user.name and an assigned IP. Messages can arrive with time lag.
Thanks