Logstash + Fortinet + Kibana

Hello to All,

I'm trying to create Kibana map using data from Fortinet syslog and Logstash.

I was able to load geoip data to kibana, however geo.location field had to be created from Logstash because it was not created automatically.

My Logstash config looks like this at the moment:

input {
  udp {
    port => 5004
    type => fortinet
  }
}

filter {
  if [type] == "fortinet" {
    grok {
      match => {"message" => "%{SYSLOG5424PRI}%{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
      remove_field => ["@timestamp","@version","event","log"]
    }
    kv {
      source => "message"
      value_split => "="
      field_split => ","
      remove_field => "message"
    }
    mutate {
      rename => { "type" => "ftg_type" }
      rename => { "subtype" => "ftg_subtype" }
      add_field => { "type" => "fortinet" }
      add_field => { "logdate" => "%{date} %{time}" }
      convert => { "rcvdbyte" => "integer" }
      convert => { "sentbyte" => "integer" }
    }
    date {
      match => [ "logdate", "yyyy-MM-dd HH:mm:ss" ]
      timezone => "Europe/Paris"
      target => "@timestamp"
    }
    mutate {
      remove_field => ["date","time"]
    }
    date_formatter {
      source => "@timestamp"
      target => "log_day"
      pattern => "YYYY.MM.dd"
    }
    if [srcip] {
      if [srcip] !~ /^(10.|[a-f])/ {
        geoip {
          source => "srcip"
          target => "src_geoip"
        }
        if [src_geoip] {
          mutate {
            add_field => [ "[src_geoip][location]", "%{[src_geoip][geo][location][lat]}, %{[src_geoip][geo][location][lon]}" ]
			
          }
        }
      }
    }
    if [dstip] {
      if [dstip] !~ /^(10.|[a-f])/ {
        geoip {
          source => "dstip"
          target => "dst_geoip"
        }
        if [dst_geoip] {
          mutate {
            add_field => [ "[dst_geoip][location]", "%{[dst_geoip][geo][location][lat]}, %{[dst_geoip][geo][location][lon]}" ]
          }
        }
      }
    }
 }
}


output {
 if [type] == "fortinet" {
   elasticsearch {
     hosts => ["https://localhost:9200"]
     user => "logstash_internal"
     password => ""
     ssl => true
     ssl_certificate_verification => false
     index => "fortinet-%{+YYYY.MM.dd}"
   }
   file {
     path => "/log/%{log_day}/fortinet/%{devname}/%{devname}-%{[host][ip]}.gzip"
     gzip => true
   }
 }
}

Also modified template for this index in Kibana:

{
  "properties": {
    "dst_geoip.​location": {
      "ignore_malformed": false,
      "type": "geo_point",
      "ignore_z_value": false
    },
    "src_geoip.​location": {
      "ignore_malformed": false,
      "type": "geo_point",
      "ignore_z_value": false
    }
  }
}

image

Anyway, I can see this fields in Kibana:

But when I see mappings in specific index it is mapped twice:

image

And field is still displayed as text and can't be selected as geopoint in the MAP:

image

Can anyone help me with that issue?

Best regards.

A field can not be mapped twice, so there is something wrong in your configuration, can you share the entire src_geoip and dst_geoip mapping as plain text using the preformatted text option, not as image?

Probably this is the issue:

{
  "properties": {
    "dst_geoip.​location": {
      "ignore_malformed": false,
      "type": "geo_point",
      "ignore_z_value": false
    },
    "src_geoip.​location": {
      "ignore_malformed": false,
      "type": "geo_point",
      "ignore_z_value": false
    }
  }
}

This creates a mapping for a field dst_geop.location where the dot is parte of the name, this creates a mapping for the following field:

{ "dst_geoip.location": "value" }

What you want is:

{ "dst_geoip": { "location": "value" } } 

Since LS v8+ is ECSv8 is by default, the geoip plugin will work if you set the field name in format: [source][ip], like this:

    geoip {
       source => "[source][ip]"
       ecs_compatibility => "v8" # default
       tag_on_failure => ["Location unknown or similar msg"]
    }

Similar is for [destination][ip] for the destination

If you set ecs_compatibility = disabled then you can lookup directly the
srcip or dstip fields.

  1. For custom geo data you have to create GeoJSON, ECS v8 which should look like this.
	if ([srcip] =~ /^10\./)  {
	    mutate {
         add_field => {
          "[src_geoip][city_name]" => "Baltimore"
          "[src_geoip][continent_code]" => "NA"
          "[src_geoip][continent_name]" => "North America"
          "[src_geoip][country_iso_code]" => "US"
          "[src_geoip][country_name]" => "USA"
          "[src_geoip][location][lon]" => -76.6348
          "[src_geoip][location][lat]" => 39.2851
          "[src_geoip][name]" => "Baltimore HQ"
          "[src_geoip][postal_code]" => "667"
          "[src_geoip][region_iso_code]" => "US-MD"
          "[src_geoip][region_name]" => "Maryland"
          "[src_geoip][timezone]" => "America/Detroit"
         }

        }
	}

ECS v1 looks very similar, check what your LS which the structure will generate.

  1. You must define mappings-dataview. FB is using dynamic structure
   "src_geoip": {
              "properties": {
                "region_iso_code": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "continent_name": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "city_name": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "country_iso_code": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "timezone": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "country_name": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "name": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "continent_code": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "region_name": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "location": {
                  "type": "geo_point"
                },
                "postal_code": {
                  "ignore_above": 1024,
                  "type": "keyword"
                }
              }
            },

For older versions,which include country_code2, country_code3 etc., you can use the structure:

        "src_geoip"  : {
		  "dynamic": true,
		  "properties" : {
			"ip": { "type": "ip" },
			"location" : { "type" : "geo_point" },
			"latitude" : { "type" : "half_float" },
			"longitude" : { "type" : "half_float" }
		  }
        },