Help with Logstash filter and private geoip data

I realize I should have a better understanding of elastic, indexes, mappings, logstash and beats before coming here asking for help, but I really need to put this behind me. I have spent days searching and reading about geoip filter, geoip-info pipeline, creating private databases for lookups and clearly I don't have the knowledge to make it happen. I don't need a complex solution, I need a very simple one line statement (If possible!)

I have a simple setup of ES. Following the small business blog post I have configured packetbeat to include the private geoip data for source and destination addresses. The configuration is simple easy to understand and works.

    - add_fields: 
      when.network.destination.ip: private 
      fields: 
        destination.geo.location: 
          lat: 40.7128 
          lon: -74.0060 
        destination.geo.continent_name: North America
        destination.geo.country_iso_code: US
        destination.geo.region_name: New York
        destination.geo.region_iso_code: US-NY
        destination.geo.city_name: New York City
      target: '' 

I would like to take that same approach, using logstash filter, mutate or anything that would work and add the private geoip data for source and destination. Could someone please show me how to complete this config file so that it will be the same as above? Here is my logstash config.

    filter {
      if [destination][ip] =~ /^172\.31/ {
        mutate { add_field => { "destination.geo.city_name" => "New York City" } }
        mutate { add_field => { "destination.geo.continent_name" => "North America" } }
        mutate { add_field => { "destination.geo.country_iso_code" => "US" } }
        mutate { add_field => { "destination.geo.region_iso_code" => "US-NY" } }
        mutate { add_field => { "destination.geo.region_name" => "New York" } }
        mutate { add_field => { WHAT GOES HERE } }
      }
    } 

Obviously once I can get logstash to correctly modify the source and destination information I can remove all the settings at the beat level. I hope that someone can complete the configuration file above. I can't imagine there isn't a way to populate the same data as the beat does. Any help you can give me I would greatly appreciate.

Something like this may help,

    filter {
      if [destination][ip] =~ /^172\.31/ {
        mutate { add_field => { "[destination][geo][city_name]" => "New York City" } }
        mutate { add_field => { "[destination][geo][continent_name]" => "North America" } }
        mutate { add_field => { "[destination][geo][country_iso_code]" => "US" } }
        mutate { add_field => { "[destination][geo][region_iso_code]" => "US-NY" } }
        mutate { add_field => { "[destination][geo][region_name]" => "New York" } }
        mutate { add_field => { "[destination][geo][location][lat]" => "40.7128"} }
	    mutate { add_field => { "[destination][geo][location][lon]" => "-74.0060"} }
      }
    }

Thank you for your response. I have tried what you suggested which yields the following results in the logstash logs.

{"type"=>"mapper_parsing_exception", 
"reason"=>"failed to parse field [destination.geo.location] of type [geo_point]", 
"caused_by"=>{"type"=>"parse_exception", "reason"=>"longitude must be a number"}}}}}

You can convert to float

mutate {convert => ["[destination][geo][location][lat]", "float"]}
mutate {convert => ["[destination][geo][location][lon]", "float"]}

Really appreciate your response. Added, same result.

"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [destination.geo.location] of type [geo_point]", "caused_by"=>{"type"=>"parse_exception", "reason"=>"longitude must be a number"}}}}}

Strange :slight_smile:
May i ask if you can share your complete logstash configuration and index mapping ?

Sure. Here is the logstash config.

    input {
      beats {
        port => 5044
      }
    }

    filter {
      if [destination][ip] =~ /^172\.31/ {
        mutate { add_field => { "destination.geo.city_name" => "Columbus" } }
        mutate { add_field => { "destination.geo.continent_name" => "North America" } }
        mutate { add_field => { "destination.geo.country_iso_code" => "US" } }
        mutate { add_field => { "destination.geo.country_name" => "United States" } }
        mutate { add_field => { "destination.geo.region_iso_code" => "US-OH" } }
        mutate { add_field => { "destination.geo.region_name" => "Ohio" } }
        mutate { add_field => { "destination.geo.name" => "AWS Ohio Datacenter" } }
        mutate { add_field => { "[destination][geo][location][lat]" => "40.7128"} }
        mutate { add_field => { "[destination][geo][location][lon]" => "-74.0060"} }
        mutate {convert => ["[destination][geo][location][lat]", "float"]}
        mutate {convert => ["[destination][geo][location][lon]", "float"]}
      }
    }

    output {
      elasticsearch {
        hosts => ["http://node1:9200","http://node2:9200"]
        index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        user => "some_user"
        password => "somepassword"
        pipeline => "geoip-info"
      }
    }

Thanks again. If I remove all of filter statements at the logstash level and add them to each individual beat configuration (as I posted originally) everything works as it should. I'm just trying to avoid updating hundreds of beat configs on multiple machines on a private network.

I tried your configuration as follow for testing, i didn't get any error and i was able to view flow in SIEM without any issues:

    input {
      beats {
        port => 5044
      }
    }

    filter {
      if [destination][ip] =~ /^255\.255/ {
        mutate { 
			add_field => { "[destination][geo][city_name]" => "Casablanca" }
			add_field => { "[destination][geo][continent_name]" => "Africa" }
			add_field => { "[destination][geo][country_iso_code]" => "MO" }
			add_field => { "[destination][geo][country_name]" => "MOROCCO" }
			add_field => { "[destination][geo][region_iso_code]" => "AF-NO" }
			add_field => { "[destination][geo][region_name]" => "North Africa" }
			add_field => { "[destination][geo][name]" => "Casablanca Ain Diab Datacenter" }
			add_field => { "[destination][geo][location][lat]" => "33.56756115608338"}
			add_field => { "[destination][geo][location][lon]" => "-7.628037574169903"}
			}

        mutate {
				convert => ["[destination][geo][location][lat]", "float"]
				convert => ["[destination][geo][location][lon]", "float"]
				}
      }
    }

    output {
      elasticsearch {
        hosts => ["https://localhost:9200"]
        user => "elastic"
        password => "changeme"
        cacert => "/dev/es/elasticsearch/config/certs_ssl/node01/node01.crt"
        index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        action => "index"
		ssl => true
		pipeline => "geoip-info"
      }
    }

Genius. Thank you so much for helping me through that. I've made the changes and it's working without error. Hopefully it will yield the results I wanted from the beginning. Again, I thank you!

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.