I have netflow and ipfix data coming in from logstash. When logstash tries to send this to elasticsearch I get these errors.
I tried to tidy up my code using the alter filter instead of all the ifs' and mutate. I am getting the same error no matter what I do when I either enable the alter filter, or if I enable two of the if filters.
Idealy I want to send both ipv6 and ipv4 to the same flow.field.
One flow should only have either one of ipfix.srcipv4, netflow.srcipv4, ipfix.srcipv6, netflow.srcipv6.
This is my config:
# alter { coalesce => [ "[flow][Source IP]", "[netflow][ipv4_src_addr]", "[netflow][ipv6_src_addr]", "[ipfix][sourceIPv4Address]", "[ipfix][sourceIPv6Address]" ] }
if "[netflow][ipv4_src_addr]" { mutate { add_field => { "[flow][Source IPv4]" => "%{[netflow][ipv4_src_addr]}" } } }
if "[netflow][ipv6_src_addr]" { mutate { add_field => { "[flow][Source IPv6]" => "%{[netflow][ipv6_src_addr]}" } } }
# if "[ipfix][sourceIPv4Address]" { mutate { add_field => { "[flow][Source IPv4]" => "%{[ipfix][sourceIPv4Address]}" } } }
# if "[ipfix][sourceIPv6Address]" { mutate { add_field => { "[flow][Source IPv6]" => "%{[ipfix][sourceIPv6Address]}" } } }
# alter { coalesce => [ "[flow][Destination IP]", "[netflow][ipv4_dst_addr]", "[netflow][ipv6_dst_addr]", "[ipfix][destinationIPv4Address]", "[ipfix][destinationIPv6Address]" ] }
# if "[netflow][ipv4_dst_addr]" { mutate { add_field => { "[flow][Destination IP]" => "%{[netflow][ipv4_dst_addr]}" } } }
# if "[netflow][ipv6_dst_addr]" { mutate { add_field => { "[flow][Destination IP]" => "%{[netflow][ipv6_dst_addr]}" } } }
# if "[ipfix][destinationIPv4Address]" { mutate { add_field => { "[flow][Destination IP]" => "%{[ipfix][destinationIPv4Address]}" } } }
# if "[ipfix][destinationIPv6Address]" { mutate { add_field => { "[flow][Destination IP]" => "%{[ipfix][destinationIPv6Address]}" } } }
These are the errors I am getting.
[2017-01-05T11:26:36,193][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"netflow-2017.01.05", :_type=>"netflow", :_routing=>nil}, 2017-01-05T10:26:36.000Z 172.23.253.1 %{message}], :response=>{"index"=>{"_index"=>"netflow-2017.01.05", "_type"=>"netflow", "_id"=>"AVluKoAe3zwQr38-kLpd", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [flow.Source IPv6]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{[netflow][ipv6_src_addr]}' is not an IP string literal."}}}}}
[2017-01-06T12:15:52,755][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"netflow-2017.01.06", :_type=>"netflow", :_routing=>nil}, 2017-01-06T11:15:52.000Z 172.23.253.1 %{message}], :response=>{"index"=>{"_index"=>"netflow-2017.01.06", "_type"=>"netflow", "_id"=>"AVlzffky3zwQr38-k784", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [flow.Source IPv4]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{[netflow][ipv4_src_addr]}' is not an IP string literal."}}}}}
My mappings in elasticsearch looks like this:
PUT _template/netflow
{
"template": "netflow-*",
"settings": { "index.refresh_interval": "5s" },
"mappings": {
"_default_": {
"_all": { "enabled": false},
"properties": {
"@timestamp": { "index": "analyzed", "type": "date"},
"@version": { "index": "analyzed", "type": "integer"},
"host": { "index": "analyzed", "type": "ip"},
"flow": {
"dynamic": true,
"type": "object",
"properties" : {
"IP version": { "index": "analyzed", "type": "integer"},
"Protocol Name": { "index": "analyzed", "type": "string"},
"Source IP": { "index": "analyzed", "type": "ip"},
"Source IPv4": { "index": "analyzed", "type": "ip"},
"Source IPv6": { "index": "analyzed", "type": "ip"},
"Destination IP": { "index": "analyzed", "type": "ip"},
"Source Port": { "index": "analyzed", "type": "integer"},
"Destination Port": { "index": "analyzed", "type": "integer"},
"Bytes in": { "index": "analyzed", "type": "long" },
"Packets in": { "index": "analyzed", "type": "long" }
}
},
"ipfix": {
"dynamic": true,
"type": "object",
"properties" : {
"version": { "index": "analyzed", "type": "integer" },
"sourceIPv4Address": { "index": "analyzed", "type": "ip" },
"destinationIPv4Address": { "index": "analyzed", "type": "ip" },
"postNATSourceIPv4Address": { "index": "analyzed", "type": "ip" },
"postNATDestinationIPv4Address": { "index": "analyzed", "type": "ip" },
"ipNextHopIPv4Address": { "index": "analyzed", "type": "ip" },
"sourceIPv6Address": { "index": "analyzed", "type": "ip" },
"destinationIPv6Address": { "index": "analyzed", "type": "ip" },
"ipNextHopIPv6Address": { "index": "analyzed", "type": "ip" },
.....