Mapping Netflow Data

Hi All,

Just some context, I originally raised this issue in Kibana's issue tracker, but was told that it's an Elasticsearch issue and that it would be best to raise it here.

When attempting to view my Netflow data using Kibana 4.0.2 (Build 6004), I get the following warning in a yellow banner:

Courier Fetch: 5 of 5 shards failed.

Using Chrome's Developer tools I pulled out the query, payload, and response. All of which are below.

I can view this data in Kibana 4 if I let Elasticsearch create a mapping dynamically for this index, but I would like to use a custom one so that it's optimized. Also, the generated template produces a lot of parsing errors in Elasticsearch's logs (Numeric value out of range of long, etc...). I've included the mapping I'm trying to use below, and I don't see anything wrong with it. Netflow data includes bigger numbers than "type": "long" can handle, so I needed to use "type": "string" for some fields.

Request

curl -XPOST http://fqdn.omitted.com:5601/elasticsearch/_msearch?timeout=0&ignore_unavailable=true&preference=1432705287095

Request Payload

{
  "index": "customindex-*",
  "ignore_unavailable": true
}\n
{
  "size": 500,
  "sort": {
    "@timestamp": "desc"
  },
  "highlight": {
    "pre_tags": [
      "@kibana-highlighted-field@"
    ],
    "post_tags": [
      "@/kibana-highlighted-field@"
    ],
    "fields": {
      "*": {}
    }
  },
  "aggs": {
    "2": {
      "date_histogram": {
        "field": "@timestamp",
        "interval": "30s",
        "pre_zone": "-07:00",
        "pre_zone_adjust_large_interval": true,
        "min_doc_count": 0,
        "extended_bounds": {
          "min": 1432704390127,
          "max": 1432705290128
        }
      }
    }
  },
  "query": {
    "filtered": {
      "query": {
        "match_all": {}
      },
      "filter": {
        "bool": {
          "must": [
            {
              "range": {
                "@timestamp": {
                  "gte": 1432704390134,
                  "lte": 1432705290134
                }
              }
            }
          ],
          "must_not": []
        }
      }
    }
  },
  "fields": [
    "*",
    "_source"
  ],
  "script_fields": {},
  "fielddata_fields": [
    "@timestamp"
  ]
}

Response

{
  "responses": [
    {
      "took": 44,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 0,
        "failed": 5,
        "failures": [
          {
            "index": "customindex-2015.05.27",
            "shard": 0,
            "status": 500,
            "reason": "RemoteTransportException[[fqdn.omitted.com][inet[/192.168.1.110:9300]][indices:data/read/search[phase/fetch/id]]]; nested: ElasticsearchIllegalStateException[No matching token for number_type [BIG_INTEGER]]; "
          },
          {
            "index": "customindex-2015.05.27",
            "shard": 1,
            "status": 500,
            "reason": "RemoteTransportException[[fqdn.omitted.com][inet[/192.168.1.110:9300]][indices:data/read/search[phase/fetch/id]]]; nested: ElasticsearchIllegalStateException[No matching token for number_type [BIG_INTEGER]]; "
          },
          {
            "index": "customindex-2015.05.27",
            "shard": 2,
            "status": 500,
            "reason": "RemoteTransportException[[fqdn.omitted.com][inet[/192.168.1.110:9300]][indices:data/read/search[phase/fetch/id]]]; nested: ElasticsearchIllegalStateException[No matching token for number_type [BIG_INTEGER]]; "
          },
          {
            "index": "customindex-2015.05.27",
            "shard": 3,
            "status": 500,
            "reason": "RemoteTransportException[[fqdn.omitted.com][inet[/192.168.112.177:9300]][indices:data/read/search[phase/fetch/id]]]; nested: ElasticsearchIllegalStateException[No matching token for number_type [BIG_INTEGER]]; "
          },
          {
            "index": "customindex-2015.05.27",
            "shard": 4,
            "status": 500,
            "reason": "RemoteTransportException[[fqdn.omitted.com][inet[/192.168.1.110:9300]][indices:data/read/search[phase/fetch/id]]]; nested: ElasticsearchIllegalStateException[No matching token for number_type [BIG_INTEGER]]; "
          }
        ]
      },
      "hits": {
        "total": 86519,
        "max_score": null,
        "hits": []
      },
      "aggregations": {
        "2": {
          "buckets": [
            {
              "key_as_string": "2015-05-27T05:16:30.000Z",
              "key": 1432703790000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:17:00.000Z",
              "key": 1432703820000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:17:30.000Z",
              "key": 1432703850000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:18:00.000Z",
              "key": 1432703880000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:18:30.000Z",
              "key": 1432703910000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:19:00.000Z",
              "key": 1432703940000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:19:30.000Z",
              "key": 1432703970000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:20:00.000Z",
              "key": 1432704000000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:20:30.000Z",
              "key": 1432704030000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:21:00.000Z",
              "key": 1432704060000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:21:30.000Z",
              "key": 1432704090000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:22:00.000Z",
              "key": 1432704120000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:22:30.000Z",
              "key": 1432704150000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:23:00.000Z",
              "key": 1432704180000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:23:30.000Z",
              "key": 1432704210000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:24:00.000Z",
              "key": 1432704240000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:24:30.000Z",
              "key": 1432704270000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:25:00.000Z",
              "key": 1432704300000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:25:30.000Z",
              "key": 1432704330000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:26:00.000Z",
              "key": 1432704360000,
              "doc_count": 0
            },
            {
              "key_as_string": "2015-05-27T05:26:30.000Z",
              "key": 1432704390000,
              "doc_count": 4209
            },
            {
              "key_as_string": "2015-05-27T05:27:00.000Z",
              "key": 1432704420000,
              "doc_count": 7270
            },
            {
              "key_as_string": "2015-05-27T05:27:30.000Z",
              "key": 1432704450000,
              "doc_count": 6646
            },
            {
              "key_as_string": "2015-05-27T05:28:00.000Z",
              "key": 1432704480000,
              "doc_count": 7181
            },
            {
              "key_as_string": "2015-05-27T05:28:30.000Z",
              "key": 1432704510000,
              "doc_count": 6612
            },
            {
              "key_as_string": "2015-05-27T05:29:00.000Z",
              "key": 1432704540000,
              "doc_count": 6753
            },
            {
              "key_as_string": "2015-05-27T05:29:30.000Z",
              "key": 1432704570000,
              "doc_count": 6509
            },
            {
              "key_as_string": "2015-05-27T05:30:00.000Z",
              "key": 1432704600000,
              "doc_count": 10295
            },
            {
              "key_as_string": "2015-05-27T05:30:30.000Z",
              "key": 1432704630000,
              "doc_count": 13073
            },
            {
              "key_as_string": "2015-05-27T05:31:00.000Z",
              "key": 1432704660000,
              "doc_count": 14627
            },
            {
              "key_as_string": "2015-05-27T05:31:30.000Z",
              "key": 1432704690000,
              "doc_count": 3344
            }
          ]
        }
      }
    }
  ]
}

Template & Mapping

{
  "template": "customindex-*",
  "settings": {
    "index.refresh_integererval": "5s",
    "index.number_of_shards": "5"
  },
  "mappings": {
    "_default_": {
      "_all": {
        "enabled": false
      }
    },
    "netflow": {
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "dateOptionalTime"
        },
        "@version": {
          "type": "string"
        },
        "host": {
          "type": "string"
        },
        "netflow": {
          "properties": {
            "flow_seq_num": {
              "type": "long"
            },
            "flowset_id": {
              "type": "long"
            },
            "nf_f_conn_id": {
              "type": "long"
            },
            "nf_f_dst_addr_ipv4": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_dst_intf_id": {
              "type": "long"
            },
            "nf_f_dst_port": {
              "type": "long"
            },
            "nf_f_egress_acl_id": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_event_time_msec": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_flow_create_time_msec": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_fwd_flow_delta_bytes": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_rev_flow_delta_bytes": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_flow_bytes": {
              "type": "long"
            },
            "nf_f_fw_event": {
              "type": "long"
            },
            "nf_f_fw_ext_event": {
              "type": "long"
            },
            "nf_f_icmp_code": {
              "type": "long"
            },
            "nf_f_icmp_type": {
              "type": "long"
            },
            "nf_f_icmp_type_ipv6": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_icmp_code_ipv6": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_ingress_acl_id": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_protocol": {
              "type": "long"
            },
            "nf_f_src_addr_ipv4": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_src_intf_id": {
              "type": "long"
            },
            "nf_f_src_port": {
              "type": "long"
            },
            "nf_f_username": {
              "type": "string"
            },
            "nf_f_xlate_dst_addr_ipv4": {
              "type": "string"
            },
            "nf_f_xlate_dst_port": {
              "type": "long"
            },
            "nf_f_xlate_src_addr_ipv4": {
              "type": "string",
              "index": "not_analyzed"
            },
            "nf_f_xlate_src_port": {
              "type": "long"
            },
            "version": {
              "type": "long"
            }
          }
        },
        "type": {
          "type": "string"
        }
      }
    }
  }
}

Sample Document

{
    "@timestamp" => "2015-05-27T06:51:08.000Z",
       "netflow" => {
                         "version" => 9,
                    "flow_seq_num" => 2196973,
                      "flowset_id" => 263,
                    "nf_f_conn_id" => 21064372,
              "nf_f_src_addr_ipv4" => 2836759729,
                   "nf_f_src_port" => 51349,
                "nf_f_src_intf_id" => 15,
              "nf_f_dst_addr_ipv4" => 2866430306,
                   "nf_f_dst_port" => 80,
                "nf_f_dst_intf_id" => 14,
                   "nf_f_protocol" => 6,
                  "nf_f_icmp_type" => 0,
                  "nf_f_icmp_code" => 0,
        "nf_f_xlate_src_addr_ipv4" => 2856329729,
        "nf_f_xlate_dst_addr_ipv4" => 2856430306,
             "nf_f_xlate_src_port" => 51349,
             "nf_f_xlate_dst_port" => 80,
                   "nf_f_fw_event" => 2,
               "nf_f_fw_ext_event" => 2015,
            "nf_f_event_time_msec" => 1452309468866,
                 "nf_f_flow_bytes" => 45
    },
      "@version" => "1",
          "type" => "netflow",
          "host" => "192.168.1.100"
}

Figured it out, or a workaround at least. I just needed to convert all/most fields to string with Logstash before I shipped off the logs to elasticsearch.

filter {
  mutate { convert => { "[netflow][version]" => "string" } }
  mutate { convert => { "[netflow][flow_seq_num]" => "string" } }
  mutate { convert => { "[netflow][flowset_id]" => "string" } }
  mutate { convert => { "[netflow][nf_f_conn_id]" => "string" } }
  mutate { convert => { "[netflow][nf_f_src_addr_ipv4]" => "string" } }
  mutate { convert => { "[netflow][nf_f_src_port]" => "string" } }
  mutate { convert => { "[netflow][nf_f_src_intf_id]" => "string" } }
  mutate { convert => { "[netflow][nf_f_dst_addr_ipv4]" => "string" } }
  mutate { convert => { "[netflow][nf_f_dst_port]" => "string" } }
  mutate { convert => { "[netflow][nf_f_dst_intf_id]" => "string" } }
  mutate { convert => { "[netflow][nf_f_protocol]" => "string" } }
  mutate { convert => { "[netflow][nf_f_icmp_type]" => "string" } }
  mutate { convert => { "[netflow][nf_f_icmp_code]" => "string" } }
  mutate { convert => { "[netflow][nf_f_xlate_src_addr_ipv4]" => "string" } }
  mutate { convert => { "[netflow][nf_f_xlate_dst_addr_ipv4]" => "string" } }
  mutate { convert => { "[netflow][nf_f_xlate_src_port]" => "string" } }
  mutate { convert => { "[netflow][nf_f_xlate_dst_port]" => "string" } }
  mutate { convert => { "[netflow][nf_f_fw_event]" => "string" } }
  mutate { convert => { "[netflow][nf_f_fw_ext_event]" => "string" } }
  mutate { convert => { "[netflow][nf_f_event_time_msec]" => "string" } }
  mutate { convert => { "[netflow][nf_f_flow_bytes]" => "string" } }
  mutate { convert => { "[netflow][nf_f_fwd_flow_delta_bytes]" => "string" } }
  mutate { convert => { "[netflow][nf_f_rev_flow_delta_bytes]" => "string" } }
  mutate { convert => { "[netflow][nf_f_flow_create_time_msec]" => "string" } }
  mutate { convert => { "[netflow][nf_f_ingress_acl_id]" => "string" } }
  mutate { convert => { "[netflow][nf_f_egress_acl_id]" => "string" } }
  mutate { convert => { "[netflow][nf_f_username]" => "string" } }
}

You can do those mutates in the one call, eg;

mutate {
    convert => [ "[netflow][version],string,"[netflow][flow_seq_num]",string, etc etc]
  }

Or you should be able to, not sure how it handle nested fields to be honest!

Thanks for the tip. This was the only way I could make it work all in one mutate block:

  mutate { 
    convert => [ "[netflow][version]", "string" ]
    convert => [ "[netflow][flow_seq_num]", "string" ]
    convert => [ "[netflow][flowset_id]", "string" ]
    convert => [ "[netflow][nf_f_conn_id]", "string" ]
    convert => [ "[netflow][nf_f_src_addr_ipv4]", "string" ]
    convert => [ "[netflow][nf_f_src_port]", "string" ]
    convert => [ "[netflow][nf_f_src_intf_id]", "string" ]
    convert => [ "[netflow][nf_f_dst_addr_ipv4]", "string" ]
    convert => [ "[netflow][nf_f_dst_port]", "string" ]
    convert => [ "[netflow][nf_f_dst_intf_id]", "string" ]
    convert => [ "[netflow][nf_f_protocol]", "string" ]
    convert => [ "[netflow][nf_f_icmp_type]", "string" ]
    convert => [ "[netflow][nf_f_icmp_code]", "string" ]
    convert => [ "[netflow][nf_f_xlate_src_addr_ipv4]", "string" ]
    convert => [ "[netflow][nf_f_xlate_dst_addr_ipv4]", "string" ]
    convert => [ "[netflow][nf_f_xlate_src_port]", "string" ]
    convert => [ "[netflow][nf_f_xlate_dst_port]", "string" ]
    convert => [ "[netflow][nf_f_fw_event]", "string" ]
    convert => [ "[netflow][nf_f_fw_ext_event]", "string" ]
    convert => [ "[netflow][nf_f_event_time_msec]", "string" ]
    convert => [ "[netflow][nf_f_flow_bytes]", "string" ]
    convert => [ "[netflow][nf_f_fwd_flow_delta_bytes]", "string" ]
    convert => [ "[netflow][nf_f_rev_flow_delta_bytes]", "string" ]
    convert => [ "[netflow][nf_f_flow_create_time_msec]", "string" ]
    convert => [ "[netflow][nf_f_ingress_acl_id]", "string" ]
    convert => [ "[netflow][nf_f_egress_acl_id]", "string" ]
    convert => [ "[netflow][nf_f_username]", "string" ]
  }

Hi Ginja,

Not sure why you want everything as sting you are losing a lot of functionality like IP range and SUM per destination port for example.

Here is my Elasticsearch template for netflow. See if that helps you.

Works fine for me ...

curl -XPUT localhost:9200/_template/netflow -d '
{
"order" : 2,
"template" : "logstash-ptc-netflow*",
"mappings" : {
"netflow" : {
"dynamic_templates" : [ {
"message_field" : {
"mapping" : {
"index" : "analyzed",
"omit_norms" : true,
"type" : "string"
},
"match" : "message",
"match_mapping_type" : "string"
}
}, {
"string_fields" : {
"mapping" : {
"index" : "analyzed",
"omit_norms" : true,
"type" : "string",
"fields" : {
"raw" : {
"index" : "not_analyzed",
"ignore_above" : 256,
"type" : "string"
}
}
},
"match" : "*",
"match_mapping_type" : "string"
}
} ],
"_all" : {
"enabled" : true
},
"properties" : {
"@version" : {
"type" : "string",
"index" : "not_analyzed",
"doc_values": true
},
"@timestamp" : {
"type" : "date",
"format" : "dateOptionalTime",
"doc_values": true
},
"type" : {
"type" : "string",
"index" : "not_analyzed",
"doc_values": true
},
"flow_seq_num": {
"index": "not_analyzed",
"type": "long",
"doc_values": true
},
"engine_type": {
"index": "not_analyzed",
"type": "integer",
"doc_values": true
},
"engine_id": {
"index": "not_analyzed",
"type": "integer",
"doc_values": true
},
"sampling_algorithm": {
"index": "not_analyzed",
"type": "integer",
"doc_values": true
},
"sampling_interval": {
"index": "not_analyzed",
"type": "integer",
"doc_values": true
},
"flow_records": {
"index": "not_analyzed",
"type": "integer",
"doc_values": true
},
"ipv4_src_addr": {
"index": "analyzed",
"type": "ip",
"doc_values": true
},
"ipv4_dst_addr": {
"index": "analyzed",
"type": "ip",
"doc_values": true
},
"ipv4_next_hop": {
"index": "analyzed",
"type": "ip",
"doc_values": true
},
"input_snmp": {
"index": "not_analyzed",
"type": "long",
"doc_values": true
},
"output_snmp": {
"index": "not_analyzed",
"type": "long",
"doc_values": true
},
"in_pkts": {
"index": "analyzed",
"type": "long",
"doc_values": true
},
"in_bytes": {
"index": "analyzed",
"type": "long",
"doc_values": true
},
"first_switched": {
"index": "not_analyzed",
"type": "date",
"doc_values": true
},
"last_switched": {
"index": "not_analyzed",
"type": "date",
"doc_values": true
},
"l4_src_port": {
"index": "analyzed",
"type": "long",
"doc_values": true
},
"l4_dst_port": {
"index": "analyzed",
"type": "long",
"doc_values": true
},
"tcp_flags": {
"index": "analyzed",
"type": "integer",
"doc_values": true
},
"protocol": {
"index": "analyzed",
"type": "integer",
"doc_values": true
},
"src_tos": {
"index": "analyzed",
"type": "integer",
"doc_values": true
},
"src_as": {
"index": "analyzed",
"type": "integer",
"doc_values": true
},
"dst_as": {
"index": "analyzed",
"type": "integer",
"doc_values": true
},
"src_mask": {
"index": "analyzed",
"type": "integer",
"doc_values": true
},
"dst_mask": {
"index": "analyzed",
"type": "integer",
"doc_values": true
}
}
}
},
"settings" : {
"index": {
"refresh_interval" : "5s",
"store.throttle.max_bytes_per_sec" : "200mb",
"translog.flush_threshold_size": "200mb"
}
}
}'