I am using Logstash to ingest Netflow traffic and after parsing, I store the data in Kafka. As the Netflow traffic I am ingesting increases, the probability of incorrect structured data being parsed also increases. However, it doesn't result in errors; instead, it produces incorrect results in the parsing, such as misaligned timestamps, etc. My Logstash version is 8.4.3. Here is my configuration file:
input {
udp{
host => "1xx.2xx.1xx.1xx"
port => 38888
type => "netflow_all"
codec => netflow
}
}
filter{
mutate {
rename => { "[netflow][ipv4_src_addr]" => "src_ip"
"[netflow][l4_src_port]"=> "src_port"
"[netflow][ipv4_dst_addr]"=>"dst_ip"
"[netflow][l4_dst_port]"=>"dst_port"
"[netflow][in_bytes]"=>"bytes"
"[netflow][protocol]"=>"proto"
"[netflow][first_switched]"=> "first_switched"
"[netflow][last_switched]"=> "last_switched"
"[netflow][flow_records]"=> "flows"
"[host][ip]"=>"hostip"
}
remove_field => ["netflow"]
remove_field => ["host"]
remove_field=>["@version"]
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
}
output {
if [src_ip] and [dst_ip] and [type] == "netflow_all" {
kafka {
bootstrap_servers => "132.225.119.133:9092"
topic_id => "TP_ALDATA_NETFLOW_CDR"
batch_size => 500
codec => json
}
}
}