Hello all-
I'm trying to have Logstash help out with some Netflow data. It receives data nicely and sends it to Elasticsearch with no problem, but I'm trying to add an extra field that contains the connection duration.
My basic Logstash config file is pretty simple:
input {
udp {
port => 12345
codec => netflow
type => "netflow"
}
}
output {
stdout { codec => "rubydebug" }
#elasticsearch {
#host => "elastichost"
#protocol => "http"
#index => "logstash_netflow-%{+YYYY.MM.dd}"
}
}
And the output:
{
"@timestamp" => "2015-11-23T16:57:07.000Z",
"netflow" => {
"version" => 9,
"flow_seq_num" => 0,
"flowset_id" => 1024,
"ipv4_src_addr" => "xxxx.xxxx.xxxx.xxxx",
"ipv4_dst_addr" => "yyyy.yyyy.yyyy.yyyy",
"last_switched" => "2015-12-31T06:36:25.999Z",
"first_switched" => "2015-12-31T06:35:12.999Z",
"in_bytes" => 12098,
"in_pkts" => 215,
"input_snmp" => 0,
"output_snmp" => 0,
"l4_src_port" => 54065,
"l4_dst_port" => 80,
"protocol" => 6,
"tcp_flags" => 26,
"ip_protocol_version" => 4
},
"@version" => "1",
"type" => "netflow",
"host" => "elastichost"
}
I tried adding a Ruby filter to do a date diff of netflow.last_switched - netflow.first_switched, but Ruby complains about Nil values.
filter {
ruby {
init => "require 'time'"
ruby => "event['duration'] = Time.parse(event['netflow.last_switched']) - Time.parse(event['netflow.first_switched'])"
}
}
I've also tried copying netflow.first_switched and netflow.last_switched to new fields by using mutate, but that doesn't seem to work very well. I figured maybe Ruby didn't like the event field names being used:
mutate {
add_field { "flow_start" => "%{netflow.first_switched}"}
add_field { "flow_end" => "%{netflow.last_switched}"}
}
It seems that I'm copying the literal name of "netflow.first_switched" into the new field rather than the value.
So what obvious thing am I missing to get the duration of the flow added into my data?