Hello Everyone,
I am new to logstash and using it to process Netflow data. I am trying to aggregate few events but I am getting error with 'task_id' and couldn't figure out what wrong I am doing. I am pasting my filter configuration and it will be great if I can get some pointers to identify my mistake. Please let me now if any further details are required.
Logstash: 5.4.1
Filter Conf:
filter {
ruby { init => 'require "ipaddr"'
code => " event.set('[netflow][in_bits]', event.get('[netflow][in_bytes]').to_i * 8)
ipa = event.get('[netflow][ipv4_dst_addr]').to_s
ipm = event.get('[netflow][dst_mask]').to_s
timediff = (Time.parse(event.get('[netflow][last_switched]')).to_i) - (Time.parse(event.get('[netflow][first_switched]')).to_i)
back = String.new('/')
ips = ipa + back + ipm
ip = IPAddr.new ips
bps = event.get('[netflow][in_bits]').to_i / timediff.to_i if timediff.to_i >0
event.set('[netflow][prefix]',ip.to_s)
event.set('[netflow][traffic_rate]',bps)
"
}
aggregate {
task_id => event.get('[netflow][prefix]').to_s
code => " map['prefix'] = event.get('[netflow][prefix]').to_s
map['[netflow][traffic_rate]'] ||= 0
map['[netflow][traffic_rate]'] += event.get('[netflow][traffic_rate]').to_i
event.set('[netflow][agg_rate]', map['[netflow][traffic_rate]'])
"
push_previous_map_as_event => true
timeout => 3
}
}