Create Custom geoip database for Logstash 5.2

Hi, in old versions of Logstash, it was easy to create a custom dat file from a csv for having a geoip private ip database.

The csv looks like :

startIpNum,endIpNum,country,region,city,postalCode,latitude,longitude,metroCode,areaCode
168561408,168561663,FR,AM,"my city bla bla bla",06200,43.667509,7.213238,,

and I used to create my dat file with that command :
python2 csv2dat.py -w /my_path/mmcity.dat mmcity GeoLiteCity-custom.csv

for now in Logstash my dat file won't work. After some search on the elastic forum, it seems that the Logstash filter uses now geoip v2 database. I saw on the official site that the file should be like that (csv) :

geoname_id,startIpNum,endIpNum,locale_code,continent_code,continent_name,country_iso_code,country_name,subdivision_1_iso_code,subdivision_1_name,subdivision_2_iso_code,subdivision_2_name,city_name,postalCode,latitude,longitude,metroCode,time_zone
2968533,168561408,168561663,fr,EU,Europe,FR,France,PAC,"Provence-Alpes-Côte d'Azur",6,"Alpes-Maritimes","my city bla bla bla",06200,43.667509,7.213238,,Europe/Paris

How can I build new dat file ? old tools does not support that kind of structure of document and I found nothing that works.

Thanks for your help

Fayce

Logstash 5.x uses the GeoIP2 database. Any tools you use to edit will have to also support the GeoIP2 style database.

Hi Aaron, thanks for your reply. I use mmutils for building dat files

There is no newer version and I seek for something easy like csv2dat

Thank you for your attention

Fayce

I changed the extension of my custom file from *.dat to *.mmdb.

Now the error is in my conf file, I don't know what to put in the source field of the geoip filter

Here is a sample of my event log:

{
  "_index": "logstash-security-2017.03.22",
  "_type": "wineventlog",
  "_id": "AVr1COd23DUwVZ8syxA6",
  "_score": null,
  "_source": {
    "computer_name": "VMxxxxDC.company.fr",
    "process_id": 620,
    "keywords": [
      "Audit Success"
    ],
    "level": "Information",
    "log_name": "Security",
    "record_number": "2343599933",
    "event_data": {
      "ProcessName": "-",
      "LogonGuid": "{6823A8C7-1FF6-3D97-7BE9-BCEE2D}",
      "LogonType": "3",
      "IpPort": "54313",
      "SubjectLogonId": "0x0",
      "TransmittedServices": "-",
      "KeyLength": "0",
      "LmPackageName": "-",
      "TargetLogonId": "0x1408bb25f",
      "SubjectUserName": "-",
      "IpAddress": "10.13.38.45",
      "SubjectDomainName": "-",
      "ImpersonationLevel": "%%1833",
      "ProcessId": "0x0",
      "TargetUserName": "N133973",
      "LogonProcessName": "Kerberos",
      "TargetDomainName": "DOMAIN",
      "SubjectUserSid": "S-1-0-0",
      "AuthenticationPackageName": "Kerberos",
      "TargetUserSid": "S-1-5-21-117609710-1482476501-18016745317"
    },
    "message": "An account was successfully logged on.

my conf file (filter section) :

filter{
 if "[event_data][TargetUserName]" =~/([a-z][A-Z][0-9]{3-7})/ {
  translate {
    dictionary_path => "/etc/logstash/mutate/ExportADLDS.yml"
    field => "[event_data][TargetUserName]"
    destination => "[NCADisplayName]"
    }
   }
 if [type] == "wineventlog" {
  grok {
    match => { "message" => "%{DATA:ProcessName} %{DATA:LogonGuid} %{DATA:LogonType}  %{DATA:IpPort} %{DATA:SubjectLogonId} %{DATA:TransmittedServices} %{DATA:KeyLength} %{DATA:LmPackageName} %{DATA:TargetLogonId} %{DATA:SubjectUserName} %{IPV4:IpAddress} %{DATA:SubjectDomainName} %{DATA:ImpersonationLevel} %{DATA:ProcessId} %{DATA:TargetUserName} %{DATA:LogonProcessName} %{WORD:TargetDomainName} %{DATA:SubjectUserSid} %{WORD:AuthenticationPackageName} %{DATA:TargetUserSid}" }
    }  
  geoip {
    source => "IpAddress"
    database => "/etc/logstash/geoip/mmcity.mmdb"
    fields => [ "startIpNum", "endIpNum", "country", "region", "city", "postalCode", "latitude", "longitude" ]
    target => "geoip.location"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
   }
  mutate {
    convert => [ "[geoip][coordinates]", "float" ]
   }
  } 
}

and the error in logstash log :

[2017-03-22T08:58:50,914][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/etc/logstash/geoip/mmcity.mmdb"}
[2017-03-22T08:58:50,931][ERROR][logstash.filters.geoip   ] The GeoLite2 MMDB database provided is invalid or corrupted. {:exception=>com.maxmind.db.InvalidDatabaseException: Could not find a MaxMind DB metadata marker in this file (mmcity.mmdb). Is this a valid MaxMind DB file?, :field=>"IpAddress"}
[2017-03-22T08:58:50,938][ERROR][logstash.pipeline        ] Error registering plugin {:plugin=>"#<LogStash::FilterDelegator:0x47aac738 @id=\"363eb8e033e6d0c9b4f8087102550357e3f8e024-7\", @klass=LogStash::Filters::GeoIP, @metric_events=#<LogStash::Instrument::NamespacedMetric:0x37fc3fab @metric=#<LogStash::Instrument::Metric:0x377864b @collector=#<LogStash::Instrument::Collector:0x659ba59 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x175b2ead @store=#<Concurrent::Map:0x418c5a8a @default_proc=nil>, @structured_lookup_mutex=#<Mutex:0x7ef6c92e>, @fast_lookup=#<Concurrent::Map:0x35b63fdb @default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :filters, :\"363eb8e033e6d0c9b4f8087102550357e3f8e024-7\", :events]>, @logger=#<LogStash::Logging::Logger:0x58ad0c7c @logger=#<Java::OrgApacheLoggingLog4jCore::Logger:0x569a0501>>, @filter=<LogStash::Filters::GeoIP source=>\"clientip\", database=>\"/etc/logstash/geoip/mmcity.mmdb\", fields=>[\"startIpNum\", \"endIpNum\", \"country\", \"region\", \"city\", \"postalCode\", \"latitude\", \"longitude\"], target=>\"geoip.location\", add_field=>{\"[geoip][coordinates]\"=>[\"%{[geoip][longitude]}\", \"%{[geoip][latitude]}\"]}, id=>\"363eb8e033e6d0c9b4f8087102550357e3f8e024-7\", enable_metric=>true, periodic_flush=>false, cache_size=>1000, lru_cache_size=>1000, tag_on_failure=>[\"_geoip_lookup_failure\"]>>", :error=>"Could not find a MaxMind DB metadata marker in this file (mmcity.mmdb). Is this a valid MaxMind DB file?"}
[2017-03-22T08:58:51,157][ERROR][logstash.agent           ] Pipeline aborted due to error {:exception=>com.maxmind.db.InvalidDatabaseException: Could not find a MaxMind DB metadata marker in this file (mmcity.mmdb). Is this a valid MaxMind DB file?, :backtrace=>["com.maxmind.db.Reader.findMetadataStart(com/maxmind/db/Reader.java:278)", "com.maxmind.db.Reader.<init>(com/maxmind/db/Reader.java:129)", "com.maxmind.db.Reader.<init>(com/maxmind/db/Reader.java:116)", "com.maxmind.geoip2.DatabaseReader.<init>(com/maxmind/geoip2/DatabaseReader.java:37)", "com.maxmind.geoip2.DatabaseReader.<init>(com/maxmind/geoip2/DatabaseReader.java:27)", "com.maxmind.geoip2.DatabaseReader$Builder.build(com/maxmind/geoip2/DatabaseReader.java:133)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)", "RUBY.register(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-filter-geoip-4.0.4-java/lib/logstash/filters/geoip.rb:143)", "RUBY.suppress_all_warnings(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-filter-geoip-4.0.4-java/lib/logstash/filters/geoip.rb:21)", "RUBY.register(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-filter-geoip-4.0.4-java/lib/logstash/filters/geoip.rb:130)", "RUBY.register(/usr/share/logstash/vendor/jruby/lib/ruby/1.9/forwardable.rb:201)", "RUBY.register_plugin(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:282)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:293)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:293)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:303)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:232)", "RUBY.start_pipeline(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:387)", "java.lang.Thread.run(java/lang/Thread.java:745)"]}
[2017-03-22T08:58:51,207][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}

I'm not sure just changing the extension will fix your database. Please see http://dev.maxmind.com/geoip/geoip2/downloadable/

I do not see mmutils in this list. It still appears to be a v1 tool.

Hi Aaron, after 2 days of research, it seems very hard to build a custom geoip2 database. So I will try to add geoip fileds from csv, but I think something is missing

csv file (it have 400 entries) :

startIp,endIp,country,region,city,postalCode,latitude,longitude
10.12.11.*,10.12.11.255,FR,AM,"address city 1",06200,43.667509,7.213238
10.50.219.*,10.50.219.255,FR,AM,"address city 2",06200,43.667509,7.213238
10.12.10.*,10.12.10.255,FR,AM,"address city 3",06200,43.667509,7.213238

My filter file

filter{
  csv {
source => "/etc/logstash/mutate/nca.csv"
separator => ","
columns => [ "startIp", "endIp", "country", "region", "city", "postalCode", "latitude", "longitude" ]  
add_tag => [ "csv_parse_successfull" ]	
	add_field => { "temp_longitude" => "%{longitude}" }
	add_field => { "temp_latitude" => "%{latitude}" }
   }
 if "[event_data][IpAddress]" == "startIp"{
   mutate {
convert => { "temp_longitude" => "float" }
	convert => { "temp_latitude" => "float" }
  }
  mutate {
	rename => { "temp_longitude" => "[geoip][longitude]" }
rename => { "temp_latitude" => "[geoip][latitude]"  }
  } 
 } 
}

Nothing happens, no tag added, or geoip. longitude /geoip.latitude

Here is a sample of the event log with the nested field with IPV4 address ( "[event_data][IpAddress]" )

{
  "_index": "logstash-security-2017.03.22",
  "_type": "wineventlog",
  "_id": "AVr1COd23DUwVZ8syxA6",
  "_score": null,
  "_source": {
    "computer_name": "VMxxxxDC.company.fr",
    "process_id": 620,
    "keywords": [
      "Audit Success"
    ],
    "level": "Information",
    "log_name": "Security",
    "record_number": "2343599933",
    "event_data": {
      "ProcessName": "-",
      "LogonGuid": "{6823A8C7-1FF6-3D97-7BE9-BCEE2D}",
      "LogonType": "3",
      "IpPort": "54313",
      "SubjectLogonId": "0x0",
      "TransmittedServices": "-",
      "KeyLength": "0",
      "LmPackageName": "-",
      "TargetLogonId": "0x1408bb25f",
      "SubjectUserName": "-",
      "IpAddress": "10.13.38.45",
      "SubjectDomainName": "-",
      "ImpersonationLevel": "%%1833",
      "ProcessId": "0x0",
      "TargetUserName": "N133973",
      "LogonProcessName": "Kerberos",
      "TargetDomainName": "DOMAIN",
      "SubjectUserSid": "S-1-0-0",
      "AuthenticationPackageName": "Kerberos",
      "TargetUserSid": "S-1-5-21-117609710-1482476501-18016745317"
    },

Thank you for help and advices

Fayce

You might be able to use the translate filter to load a dictionary from file with your data.

Thanks Christian, I use a translate filter for adding a field in the event (in my case for adding "name, first name" from a user registration number)

I don't knom how to use the translate filter for adding at least 3 fields:

latitude as [geoip][latitude]
longitude as [geoip][longitude]
[geoip][location] by using maybe the mutate - join filter

You can set the value to be a JSON or csv formatted field, and then use either a json or csv filter to parse it from the destination field out once this has been populated by the translate filter.

If you have a sample code or an example I really appreciate.

Thank you Christian

I tried This :

filter {
if "[event_data][IpAddress]" == "(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])"{
  translate {
    dictionary_path  => "/etc/logstash/mutate/custom_city.csv"
    field => "[event_data][IpAddress]" 
	add_field => { "[geoip][longitude]" => "%{longitude}" }
	add_field => { "[geoip][latitude]" => "%{latitude}" }
	add_field => { "[geoip][location]" => "%{longitude},%{latitude}" }
   }
   mutate {
    convert => { "[geoip][longitude]" => "float" }
	convert => { "[geoip][latitude]" => "float" }
  }
 } 
}

I can see in the logstash log (in debug) that logstash only load my two first column of my csv file

LogStash::Filters::Translate: Dictionary -  {:dictionary=>{"startIp"=>"endIp", "10.12.11.*"=>"10.12.11.255",etc...
LogStash::Filters::Translate: Dictionary translation method - Exact

how to specify colums in translate filter like in csv filter ?

Thank you

It might be easier to change the format of the lookup file to YAML and have each key point to a string that is a correctly formatted JSON object, like described here (JSON) or here (csv), which were found through a simple search of the forum.

Hi Christian. I tried something simple. I ues 2 yaml files : latitude.yml and longitude.yml

latitude.yml
10.22.33.: "43.701535"
10.13.33.
: "43.718560"
10.23.33.: "43.718560"
10.12.96.
: "43.678237"

longitude.yaml
10.22.33.: "7.281819"
10.13.33.
: "7.265417"
10.23.33.: "7.265417"
10.12.96.
: "7.228675"

and my filter conf :

filter {
 if "[event_data][IpAddress]" =~ /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/ {
  translate {
    dictionary_path  => "/etc/logstash/mutate/nca-latitude.yml"
    field => "[event_data][IpAddress]" 
    destination => "[geoip][latitude]"
    override => true
    }
  translate {
    dictionary_path  => "/etc/logstash/mutate/nca-longitude.yml"
    field => "[event_data][IpAddress]" 
    destination => "[geoip.longitude]"
    }
  mutate {
    convert => { "[geoip][longitude]" => "float" }
    convert => { "[geoip][latitude]" => "float" }
  }
 }
}

I want to match the IP in my field "[event_data][IpAddress]" with one of my yaml file and add
"[geoip][longitude]" and "[geoip][latitude]"

I don't knom if the star at the end of each ip is the right thing to do (* should be 0 to 255).

Thanks for help and advices

That is not what I meant. You can simplify it a bit, as shown in the following simple example.

I created a small translation file named jsontranslate.yml:

'10.22.33.*': '{"geoip": {"latitude": 43.701535, "longitude": 7.281819}}'
'10.13.33.*': '{"geoip": {"latitude": 43.718560, "longitude": 7.265417}}'

This is used in the following simple config file, which assumes the message contains just the IP address:

input { stdin {} }

filter{
	translate {
	    regex => true
	    dictionary_path => "./jsontranslate.yml"
	    field => "message"
	}

	json {
	    source => "translation"
	}
}

output { stdout { codec => rubydebug} }

You should be able to expand on this to automatically populate the geoip information without having to do all the copying and mutating. It gives the following result when run:

$ echo 10.22.33.44 | logstash -f ./jsontranslate.conf 
[2017-03-28T08:20:37,252][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-03-28T08:20:37,260][INFO ][logstash.pipeline        ] Pipeline main started
[2017-03-28T08:20:37,308][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
{
     "@timestamp" => 2017-03-28T07:20:37.253Z,
          "geoip" => {
         "latitude" => 43.701535,
        "longitude" => 7.281819
    },
       "@version" => "1",
    "translation" => "{\"geoip\": {\"latitude\": 43.701535, \"longitude\": 7.281819}}",
        "message" => "10.22.33.44",
           "tags" => []
}
1 Like

TTTHHHAANNNNKKKKKK YYOOUUU Christian, it works !!!!!!!!!!!!!!!!!!!!!!!!!!!!

I have now my fields geoip.latitude and geoip.longitude.

Last question please. geoip.location is empty in Kibana with the message "Analysis is not available for geo fields."

Should I add something ton my filter conf ?

Thank you again for all your help

I do not have it in front of me right now, but I recall the geoip filter also generates a location field that contains an array of latitude and longitude values, which is mapped as a geo point in Elasticsearch. Run a record through the geoip filter to be sure, and add this to the JSON document in the translate filter file.

something like this ? must I duplicate the fields longitude and latitude ?

10.22.33.*': '{"geoip": {"latitude": 43.701535, "longitude": 7.281819, "location": "43.701535,7.281819"}}'

I was thinking more something like this (not tested):

10.22.33.*': '{"geoip": {"latitude": 43.701535, "longitude": 7.281819, "location": [7.281819, 43.701535]}}'

I believe the format of the location field is [longitude, latitude], but you should verify this.

Thank you Christian everything works great. You are a genius :slight_smile:

Last thing please : I find a mistake with my IP

If in my yaml file I have those 2 ip :

'10.12.4.*': '{"geoip": {"latitude": 43.667805, "longitude": 7.213004, "location": [7.213004, 43.667805]}}'
'10.12.49.*': '{"geoip": {"latitude": 43.698512, "longitude": 7.278436,  "location": [7.278436, 43.698512]}}'

When I ask for example '10.12.49.18' (my second line) logstash add the values of my first line.

I tried to use a regex but nothing happens

input { stdin {} }

filter{
  if [message] =~ /^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])*$/ {
	translate {
	    exact => true
	    regex => true
	    dictionary_path => "/etc/logstash/mutate/nca-geo.yml"
	    field => "message"
	}

	json {
	    source => "translation"
	}
  } 
}

output { stdout { codec => rubydebug} }

Thank you