Help - Custom GeoIP Target / Updating Template/Index

Hi,

I'm a beginner and have been trying to use the geoip function in logstash to allow me to visualise the IP addresses being blocked by my home firewall.

However, I've now run into the problem that a number of people have previously posted about relating to using a custom target (in my example geoip_src and geoip_dst) and those being mapped incorrectly as "text" types.

I've seen in this thread that I have to create a new index which maps the relevant fields correctly. However, I'm struggling to work out how exactly I do that? Does anyone know if there's a really basic step-by-step guide for how I would make these changes?

@marcasino Can you share your logstash configuration?

If the index was created with the fields being mapped as text, you may need to update the mapping to be the proper type and then reindex.

BEGINS

    host => "127.0.0.1"
    port => 10514
    codec => "json"
    type => "rsyslog"
  }
}


filter {
  if "[BLOCKED -" in [message] {
    grok {
        match => { "message" => "%{SYSLOG5424SD:Inbound_Outbound} (?:.*) SRC=%{IPV4:Source_IP} DST=%{IPV4:Destination_IP} (?:.*) PROTO=%{WORD:Protocol} SPT=%{INT:Source_Port} DPT=%{INT:Destination_Port} (?:.*)"}
         }
    geoip {
        source => "Source_IP"
        target => "[geoip_src]"
         }
    geoip {
        source => "Destination_IP"
        target => "[geoip_dst]"
         }
  }
  if "[UFW BLOCK]" in [message] {
    grok {
        match => { "message" => "[UFW BLOCK] (?:.*) SRC=%{IPV4:Source_IP} DST=%{IPV4:Destination_IP} (?:.*) PROTO=%{WORD:Protocol} SPT=%{INT:Source_Port} DPT=%{INT:Destination_Port} (?:.*)"}
         }
    geoip {
        source => "Source_IP"
        target => "[geoip_src]"
         }
    geoip {
        source => "Destination_IP"
        target => "[geoip_dst]"
         }
  }
  if "query[" in [message] {
    grok {
        match => { "message" => "%{JAVACLASS:Domain_Requested} from %{IPV4:Request_From_IP_Address}"}
         }
  }
  if "gravity blocked" in [message] {
    grok {
        match => { "message" => "gravity blocked %{JAVACLASS:Domain_Blocked} is 0.0.0.0"}
         }
  }
}


output {
  if [type] == "rsyslog" {
    elasticsearch {
      hosts => [ "127.0.0.1:9200" ]
    }
  }
}

ENDS

Posted the logstash config above.

I think you're right about having to update the mapping and then reindex. My question is how do you correctly remap and reindex.

@marcasino Check out this example on another post. It's a similar case so you should be able to walk through to match your situation. It will basically reindex to a new index with the correct mapping. Then you can update your logstash config to start indexing into the new index.

@corey.robertson Thanks for that! Was really useful.

I've followed the example but on the last step (reindex) I get the following error:

        {
      "index" : "logstash_new",
      "type" : "_doc",
      "id" : "CUBb5HUBAXq168Vsitkz",
      "cause" : {
        "type" : "mapper_parsing_exception",
        "reason" : "failed to parse field [geoip_src_fixed.location] of type [geo_point]",
        "caused_by" : {
          "type" : "parse_exception",
          "reason" : "[lat] and [lon] must be valid double values",
          "caused_by" : {
            "type" : "number_format_exception",
            "reason" : "empty String"
          }
        }
      },
      "status" : 400

I'm not entirely sure what is exactly going wrong, but based on the error it seems like the mapping has gone wrong and the [geoip_src_fixed.location] is receiving an empty value. Any help to work out what I'm doing wrong would be appreciated.

To help, the original mapping of logstash includes

    {
  "logstash" : {
    "mappings" : {
      "properties" : {
        "geoip_dst" : {
          "properties" : {
            "location" : {
              "properties" : {
                "lat" : {
                  "type" : "float"
                },
                "lon" : {
                  "type" : "float"
                }
              }

I therefore have created the following mapping

    PUT /logstash_new
{
  "mappings": {
    "properties": {
      "geoip_src_fixed" : {
        "properties": { 
          "location" : {
            "type" : "geo_point"
          }
        }
      },
      "geoip_dst_fixed" : {
        "properties": { 
          "location" : {
            "type" : "geo_point"
          }
        }
      }
    }
  }
}

And to create the ingestion pipe I have then used

    PUT /_ingest/pipeline/convert_geo
{
  "processors": [
    {
      "set": {
        "field": "geoip_src_fixed.location.lat",
        "value": "{{geoip_src.location.lat}}"
      }
    },
    {
      "set": {
        "field": "geoip_src_fixed.location.lon",
        "value": "{{geoip_src.location.lon}}"
      }
    },
    {
      "set": {
        "field": "geoip_dst_fixed.location.lat",
        "value": "{{geoip_dst.location.lat}}"
      }
    },
    {
      "set": {
        "field": "geoip_dst_fixed.location.lon",
        "value": "{{geoip_dst.location.lon}}"
      }
    }
  ]
}

And finally run the following reindex command, which throws the above error

    POST _reindex/
{
  "source": {
    "index": "logstash"
  },
  "dest": {
    "pipeline": "convert_geo", 
    "index": "logstash_new"
  }
}

Hmm, can you confirm that there are no docs that have an empty string for those fields?

You might also want to put an if clause on the processor to filter out empty strings if that's the case? https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html

Hi Corey,

Once again thank for that! Really appreciating the help.

There are documents which contain empty strings so I updated the ingest node pipeline to the following which seems to have worked.

PUT /_ingest/pipeline/convert_geo
{
  "processors": [
    {
      "set": {
        "if": "ctx.geoip_src?.location?.lat != null",
        "field": "geoip_src.location_fixed.lat",
        "value": "{{geoip_src.location.lat}}"
      }
    },
    {
      "set": {
        "if": "ctx.geoip_src?.location?.lon != null",
        "field": "geoip_src.location_fixed.lon",
        "value": "{{geoip_src.location.lon}}"
      }
    },
    {
      "set": {
        "if": "ctx.geoip_dst?.location?.lat != null",
        "field": "geoip_dst.location_fixed.lat",
        "value": "{{geoip_dst.location.lat}}"
      }
    },
    {
      "set": {
        "if": "ctx.geoip_dst?.location?.lon != null",
        "field": "geoip_dst.location_fixed.lon",
        "value": "{{geoip_dst.location.lon}}"
      }
    }
  ]
}

Apologies, but I do have another follow up question. Is there a way to have this the ingest node pipeline continually run so that as new documents are fed through logstash, they are also fed through logstash_new. The problem I'm facing right now is that it's only the documents received prior to running the reindex appear in logstash_new.

What I'm picturing in my head is:

rsyslog->logstash->convert_geo(pipeline)->logstash_new

Appreciate that I'm probably doing something very basic very wrong here!

Couple of options I think you can investigate.

  1. Your logstash output is defaulting to the logstashindex. You could change that to use the new index instead. https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-index

  2. Now that you have an index with the correct schema, you could try deleting the original logstash index, and reindex the new one back to the original (and you shouldn't need a pipeline for that). Then new docs coming in should have the correct geopoint field type.

Corey,

You sir are a hero! Managed to get it working by using the first method you suggested.

Thank you so much!

1 Like