Transforming logs into geo_point to draw them in kibana

My goal is to be able to geolocate on a kibana map the connections that interest me.

The problem is that the generated indices do not create the correct type of data for kibana to draw. It would be Geopoints.

The data way would be

Firewall logs –> logstash (filter)-> elasticsearch(index-patern) <-kibana (map)

Logstash (Basically - sort data into fields for later use and use geoip when it's a public ip.)

input {
        # Firewall
        udp {
        port => 5000
        type => "logs"
        tags => ["mikrotik","firewall"]
        id => "firewall"
        }

}

filter {

                if "firewall" in [message] {

                        grok {
                        patterns_dir => ["/etc/logstash/conf.d/patterns/"]
                        match => { "message" => "%{MIKROTIKFIREWALL}"}
                        }
                        # src_ip first check if clientip field even exists                                                                                                                                                                      
                        if [src_ip] {                                                                                                                                                                                                           
                          # then check if it's a internal IP space and if it is add the internalIP tag                                                                                                                                     
                          if [src_ip] !~ /localhost|\-/ {                                                                                                                                                                                       
                                cidr {                                                                                                                                                                                                          
                                  add_tag => [ "src_internalIP" ]                                                                                                                                                                               
                                  address => [ "%{src_ip}" ]                                                                                                                                                                                    
                                  network => [ "192.168.0.0/8", "127.0.0.1" , "255.255.255.255", "0.0.0.0", "239.255.255.250", "172.16.0.0/12", "10.0.0.0/8", "224.0.0.0/4", "240.0.0.0/4"]                                                                                                                                                                             
                                }                                                                                                                                                                                                               
                          }                                                                                                                                                                                                                     
                          # don't run geoip if it's internalIP or localhost (aka only external IPs)                                                                                                                                             
                          if "src_internalIP" not in [tags] and [src_ip] !~ /localhost|\-/ {                                                                                                                                                    
                                        geoip {                                                                                                                                                                                                 
                                        source => "src_ip"                                                                                                                                                                                      
                                        target => "src_geoip"                                                                                                                                                                                   
                                        }                                                                                                                                                                                                       
                          }                                                                                                                                                                                                                     
                        }
                        # dst_ip first check if clientip field even exists                                                                                                                                                                      
                        if [dst_ip] {                                                                                                                                                                                                           
                          # then check if it's in the 192.168.x.x space and if it is add the internalIP tag                                                                                                                                     
                          if [dst_ip] !~ /localhost|\-/ {                                                                                                                                                                                       
                                cidr {                                                                                                                                                                                                          
                                  add_tag => [ "dst_internalIP" ]                                                                                                                                                                               
                                  address => [ "%{dst_ip}" ]                                                                                                                                                                                    
                                  network => [ "192.168.0.0/8", "127.0.0.1" , "255.255.255.255", "0.0.0.0", "239.255.255.250", "172.16.0.0/12", "10.0.0.0/8", "224.0.0.0/4", "240.0.0.0/4"]                                                                                                                                                                             
                                }                                                                                                                                                                                                               
                          }                                                                                                                                                                                                                     
                          # don't run geoip if it's internalIP or localhost (aka only external IPs)                                                                                                                                             
                          if "dst_internalIP" not in [tags] and [dst_ip] !~ /localhost|\-/ {                                                                                                                                                    
                                        geoip {                                                                                                                                                                                                 
                                        source => "dst_ip"                                                                                                                                                                                      
                                        target => "dst_geoip"                                                                                                                                                                                   
                                        }                                                                                                                                                                                                       
                          }                                                                                                                                                                                                                     
                        }
                }
}

output {
   if "firewall" in [message] {
                elasticsearch {
                        hosts => ["https://elastic-vm-1.viten.net:9200"]
                        cacert => '/etc/logstash/certs/ca/ca.crt'
                        user => 'logstash_internal'
                        password => 'B3watermyfriend.'
                        manage_template => true
                        #    template => "/path/to/logstash/logstash-apache.json"
                        template_name => "connections"
                        ilm_rollover_alias => "connection-%{+YYYY.MM.dd}"
                        ilm_pattern => "000001"
                        ilm_policy => "prso_30"
                        #index => "connection-%{+YYYY.MM.dd}"
                        action => "create"
                }
        }
}

Elasticsearch

index templates

PUT _index_template/connections
{
  "version": 2,
  "template": {
    "settings": {
      "index": {
        "number_of_replicas": "1"
      }
    },
    "mappings": {
      "properties": {
        "acction": {
          "type": "keyword"
        },
        "dst_ip": {
          "type": "ip"
        },
        "dst_port": {
          "type": "integer"
        },
        "dst_zone": {
          "type": "keyword"
        },
        "fw_chain": {
          "eager_global_ordinals": false,
          "norms": false,
          "index": true,
          "store": false,
          "type": "keyword",
          "split_queries_on_whitespace": false,
          "index_options": "freqs",
          "doc_values": true
        },
        "host.ip": {
          "type": "ip"
        },
        "infrastructure": {
          "type": "keyword"
        },
        "length": {
          "type": "integer"
        },
        "loglevel": {
          "eager_global_ordinals": false,
          "norms": false,
          "index": true,
          "store": false,
          "type": "keyword",
          "split_queries_on_whitespace": false,
          "index_options": "freqs",
          "doc_values": true
        },
        "message": {
          "type": "text"
        },
        "nat_type": {
          "type": "keyword"
        },
        "natdst_ip": {
          "type": "ip"
        },
        "natdst_port": {
          "type": "integer"
        },
        "natsrc_ip": {
          "type": "ip"
        },
        "natsrc_port": {
          "type": "integer"
        },
        "proto": {
          "eager_global_ordinals": false,
          "norms": false,
          "index": true,
          "store": false,
          "type": "keyword",
          "split_queries_on_whitespace": false,
          "index_options": "freqs",
          "doc_values": true
        },
        "src_ip": {
          "type": "ip"
        },
        "src_mac": {
          "eager_global_ordinals": false,
          "norms": false,
          "index": true,
          "store": false,
          "type": "keyword",
          "split_queries_on_whitespace": false,
          "index_options": "freqs",
          "doc_values": true
        },
        "src_port": {
          "type": "integer"
        },
        "src_zone": {
          "type": "keyword"
        },
        "tcp_flags": {
          "type": "text"
        }
      }
    },
    "aliases": {
      "conexiones": {}
    }
  },
  "index_patterns": [
    "conn*"
  ],
  "data_stream": {
    "hidden": false,
    "allow_custom_routing": false
  },
  "composed_of": [
    "**dst_geoip**",
    "**src_geoip**",
    "registros-settings"
  ]
}

Use component templates

dst_geoip

{
  "properties": {
    "geo": {
      "type": "object",
      "properties": {
        "region_name": {
          "type": "keyword"
        },
        "city_name": {
          "type": "keyword"
        },
        "region_iso_code": {
          "type": "integer"
        },
        "timezone": {
          "type": "text"
        },
        "latitude": {
          "type": "float"
        },
        "country_name": {
          "type": "keyword"
        },
        "continent_code": {
          "type": "byte"
        },
        "location": {
          "type": "geo_point"
        },
        "country_iso_code": {
          "type": "integer"
        },
        "postal_code": {
          "type": "integer"
        },
        "longitude": {
          "type": "float"
        }
      }
    },
    "ip": {
      "index": true,
      "store": false,
      "type": "ip",
      "doc_values": true
    },
    "mmdb": {
      "type": "object",
      "properties": {
        "dma_code": {
          "type": "integer"
        }
      }
    }
  }
}


src_geoip

{
  "properties": {
    "geo": {
      "type": "object",
      "properties": {
        "region_name": {
          "type": "keyword"
        },
        "city_name": {
          "type": "keyword"
        },
        "region_iso_code": {
          "type": "integer"
        },
        "timezone": {
          "type": "text"
        },
        "latitude": {
          "type": "float"
        },
        "country_name": {
          "type": "keyword"
        },
        "continent_code": {
          "type": "byte"
        },
        "location": {
          "type": "geo_point"
        },
        "country_iso_code": {
          "type": "integer"
        },
        "postal_code": {
          "type": "integer"
        },
        "longitude": {
          "type": "float"
        }
      }
    },
    "ip": {
      "index": true,
      "store": false,
      "type": "ip",
      "doc_values": true
    },
    "mmdb": {
      "type": "object",
      "properties": {
        "dma_code": {
          "type": "integer"
        }
      }
    }
  }
}

Some time ago the mistake was that the data type had to be of a specific mathematical type. Latitude and longitude - both floats or long integers, I can't remember.

I have tried to change them from the mathematical type to the pair that make up the geo point type. Or directly let the logstash transformer treat it as a geo point... I've tried it in several ways months ago because I got desperate and left it for a while.

Does anyone have clear what would be the most normal use case and easy to use? - some example ...

thank you so much for reading this far

However in both of your templates you have

So you need to align those.

Hi warkolm, thanks for your time :wink:

I'm sorry but I don't understand your proposal.

Logstash_geo_ip_funcion

The firewall logs always collect the src_ip and dst_ip values. But the normal thing is that the geolocation function only applies to one of the two - if it is outgoing traffic - to the destination and if it is incoming traffic to the origin.

That's why I have src_geoip and dst_geoip to collect the results of the geolocation function - as the case may be. That way I use the same index to store all cases.

Sorry but I don't see the problem.

My idea is to use the same fields that the geolocation function uses in my index templates. For the function to fill in the fields.

This is the dst_geoip (for example) -

imagen

dst_geoip - has 3 objects - which I have defined based on the fields that the geolocation function uses as output - for the function to use. The geopoint object has one of those fields that you indicate - location.

If you would not use it like this - please - guide me, how would you do it to collect that information and then be able to use it on maps.

Would your proposal be to separate it into two indices?

Thank you

Your geoip filters are using a target option, but your mappings doesn't seem to have the correct field mapped.

From what you shared it is not clear what are your dst_geoip and src_geoip mappings.

Do you have something like this?

{
    "template": {
        "mappings": {
            "properties": {
                "dst_geoip": {
                    "properties": {
                        "geo" : {
                            "properties" : {
                                ...
                                "location" : { "type" : "geo_point" },
                                ...
                            }
                        }
                }
            }
        }
    }
}

If so, your mappings do not match the field you are using as the target of the geoip filter.

Using this filter:

geoip {                                                                                                                                                                                                 
    source => "dst_ip"                                                                                                                                                                                      
    target => "dst_geoip"                                                                                                                                                                                   
}  

Will create the geoip fields directly under dst_geoip, you won't have a geo nested field, you will have something like this:

  {
    "dst_geoip": {
        "country_code3": "US",
        "latitude": 37.751,
        "timezone": "America/Chicago",
        "country_name": "United States",
        "location": {
            "lat": 37.751,
            "lon": -97.822
        },
        "continent_code": "NA",
        "country_code2": "US",
        "ip": "8.8.8.8",
        "longitude": -97.822
    }
}

You have the location field under dst_geoip, so [dst_geoip][location], but there is no mapping for this field, only for [dst_geoip][geo][location].

You need to fix your mappings or change the target to be [dst_geoip][geo].

Hi Leandrojmp,

I have corrected the destination as indicated.

I have also defined it in various ways - for testing purposes.

as geopoint
As Object with two float numeric values longitude and latitude
I have not even defined it so that it creates them dynamically

...but in all cases it creates them as numeric objects.

imagen

Kibana never interprets them as geographic points-so I can't use them in his maps.

Years ago I think I corrected this problem by using a mutation or translation filter.

Please, how do you do it?

Thank you very much for your time

Hello, I'm not sure how I can help further besides of what I already commented here.

Also, it is not clear what is your current issue, the screenshot you shared is for a field that was already processed by the geoip filter, I'm not sure what is the issue with those lat and lon fields, you do not need to care about them, you do not need even to map them, the only fields that matter for the geo point to work is the location field.

The steps to have a geo point field in Elasticsearch are:

  1. Create the mapping for the field
  2. Add Geolocation data using a geoip filter or sending the data on one of the supported formats.

If you want to use the field dst_geoip as the geoip field for your index, you need to have this in your mapping before adding any document to this field, if you already added a document to this field in your index, you will need to recreate the index.

"mappings": {
    "properties": {
        "dst_geoip": {
            "properties": {
                "location" : { "type" : "geo_point" }
            }
        }
    }
}

This is the only mapping needed to have a geo point on the field dst_geoip.location

Then, in Logstash you would need the following filter:

geoip {
    source => "dst_ip"
    target => "dst_geoip"
}

This filter will create many nested fields on dst_geoip, one of them will be dst_geoip.location which will have the location data needed to plot maps in Kibana.

I suggest that you review your mappings and pipelines to see what is missing, maybe a field name is wrong.

A couple of years ago I wrote this blog post about geoip in Logstash, it is based on version 7, maybe it helps having another example.

Thank you very much for your help. It has guided me to solve the problem. Things may have changed a bit... I'll tell you what I've verified.

The object tree MUST be something like this;

dst_geoip (object) -> "geo" (object) -> "location" (geographic point)

If you put other names or types to the geo or location fields - it doesn't work.

imagen

If you let him create them dynamically it doesn't work either.

They have to be those names. Yes, the dst_geoip field can vary, as long as it is the one you use, calro , in the function call.

geoip {

     source => "dst_ip"
     target => "dst_geoip"

}

Once again, thank you so much

Ok, if you need this, then the mapping for the location field needs to be:

"mappings": {
    "properties": {
        "dst_geoip": {
            "properties": {
                "geo" : {
                    "properties" : {
                        "location" : { "type" : "geo_point" }
                    }
                }
            }
        }
    }

Which will create a map for the field:

{
    "dst_geoip": {
        "geo": {
            "location": "LOCATION DATA"
        }
    }
}

Then, in your logstash pipeline your target needs to be [dst_geoip][geo].

geoip {
    source => "dst_ip"
    target => "[dst_geoip][geo]"
}

This will create the location as a nested field under dst_geop.geo, which will match the mapping.

Hi leandro,

What I mean is that - now and from what I've been able to gather from all the tests I've done. I am sure that I have made many combinations.

The "geo" field is created by the geoip filter itself. Yes or yes.

Therefore, it is the one that must be created to receive the geo_point type values with the "location" field. So if you don't create the field you .. or create a different one - he will create it and add the results of the function in all its fields - location will be a tuple of numeric values. Unless you go ahead and create it yourself of type geo_point.

That has been my deduction after many many tests.
I may be wrong though - I would encourage you to try it on version 8.x... at least I think so.

geoip {

     source => "dst_ip"
     target => "whatever"

}

whatever (objet) - geo (objet) - location (geo-point)

Thanks again for your help :wink:

Oh, I think I understand the issue now, and you are right.

This is related to one change in Logstash version 8 that can break a lot of things.

On version 8 the ecs compatibility is enabled by default, and if this is enabled the geoip filter will automatically create the geo level on the target field, so instead of using [dst_geoip][geo] as a target, you really need to use only [dst_geoip] and the geo field and its nested fields will be created.

I explictily disabled the ecs compatibility in all my logstash, 7 and 8, to avoid breaking things, and have no plan to turn it on.

In your case, you seem to be using logstash 8, so in your target option in the geoip filter you need to specify only the top-level field dst_geoip, and then the dst_geoip.geo.location field will be created, the mapping is still the same.

Just as an example, running the geoip filter with pipeline.ecs_compatibility: disabled, will give me this:

{
          "host" => "lab",
       "message" => "8.8.8.8",
      "@version" => "1",
    "@timestamp" => 2023-02-27T21:13:34.812Z,
     "dst_field" => {
              "location" => {
            "lat" => 34.0544,
            "lon" => -118.2441
        },
           "postal_code" => "90009",
             "city_name" => "Los Angeles",
              "dma_code" => 803,
              "timezone" => "America/Los_Angeles",
                    "ip" => "8.8.8.8",
           "region_code" => "CA",
              "latitude" => 34.0544,
         "country_code3" => "US",
             "longitude" => -118.2441,
         "country_code2" => "US",
           "region_name" => "California",
        "continent_code" => "NA",
          "country_name" => "United States"
    }
}

As you can see, the fields create by the geoip filter are nested under dst_field.

If I run the same pipeline using pipeline.ecs_compatibility: v8, which is the default on Logstash 8, this is the result.

{
       "message" => "8.8.8.8",
      "@version" => "1",
     "dst_field" => {
         "geo" => {
                    "timezone" => "America/Los_Angeles",
                 "postal_code" => "90009",
             "region_iso_code" => "US-CA",
                    "location" => {
                "lon" => -118.2441,
                "lat" => 34.0544
            },
                   "city_name" => "Los Angeles",
                "country_name" => "United States",
                 "region_name" => "California",
              "continent_code" => "NA",
            "country_iso_code" => "US"
        },
          "ip" => "8.8.8.8",
        "mmdb" => {
            "dma_code" => 803
        }
    },
    "@timestamp" => 2023-02-27T21:17:48.990Z,
         "event" => {
        "original" => "8.8.8.8"
    },
          "host" => {
        "hostname" => "lab"
    }
}

The geoip filter is the same:

filter {
    geoip {
        source => "message"
        target => "dst_field"
    }
}

But with the ecs compatibility enabled, it will create the nested geo filter.

Is this the issue?

Hi leandro,

What you say makes perfect sense. By default I did not touch that option since I did not understand its scope well... neither in functions nor in the depth in each one of them. But with your example it is clear.

Thank you so much

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.