Issue with logstash event feed

Hello,

I have Logstash, Elasticsearch (ver 6.1.1 recently updated ) getting the feed from Snort Syslog working for more than 6 months already, until yesterday, suddenly I noticed that I am not getting all alerts anymore.
Looking into Logstash log I see the following error below and my , any idea what went wrong?

Please advice
Thanks

    [2018-01-09T12:38:21,827][ERROR][logstash.pipeline        ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"main", "exception"=>"undefined method `to_f' for [\"lon\", -122.1206]:Array\nDid you mean?  to_h\n               to_a\n               to_s", "backtrace"=>["/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:298:in `convert_float'", "org/jruby/RubyMethod.java:115:in `call'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:271:in `block in convert'", "org/jruby/RubyArray.java:2486:in `map'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:271:in `block in convert'", "org/jruby/RubyHash.java:1343:in `each'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:261:in `convert'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:222:in `filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:145:in `do_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:164:in `block in multi_filter'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:161:in `multi_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filter_delegator.rb:48:in `multi_filter'", "(eval):2375:in `block in initialize'", "org/jruby/RubyArray.java:1734:in `each'", "(eval):2370:in `block in initialize'", "(eval):715:in `block in filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:455:in `filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:434:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:393:in `block in start_workers'"], :thread=>"#<Thread:0x2fbcf8ba@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:245 sleep>"}
[2018-01-09T12:38:21,874][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method `to_f' for ["lon", -122.1206]:Array
Did you mean?  to_h
               to_a
               to_s>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:298:in `convert_float'", "org/jruby/RubyMethod.java:115:in `call'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:271:in `block in convert'", "org/jruby/RubyArray.java:2486:in `map'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:271:in `block in convert'", "org/jruby/RubyHash.java:1343:in `each'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:261:in `convert'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-mutate-3.2.0/lib/logstash/filters/mutate.rb:222:in `filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:145:in `do_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:164:in `block in multi_filter'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:161:in `multi_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filter_delegator.rb:48:in `multi_filter'", "(eval):2375:in `block in initialize'", "org/jruby/RubyArray.java:1734:in `each'", "(eval):2370:in `block in initialize'", "(eval):715:in `block in filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:455:in `filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:434:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:393:in `block in start_workers'"]}

Before, the field you are using in the mutate filter to convert to float was a float-like string.
Now, it is an array ["lon", -122.1206].

There are two explanations:

  1. Your data has changed shape
  2. LS 6.1.1 is doing something different than the version that you were running on before.

To help you further, you need to post a lot more details:

  1. The previous version and the plugin versions too.
  2. Your config
  3. A sample of the incoming data from before and a sample of the data from now.

Thanks for your response here is my logstash config , if can have a look
Thanks

# Author: Tal Bar-Or
# Email: tbaror@dalet.com
# Last Update: 11/18/2016
#
# This conf file is based on accepting logs for snort
input {
  udp {
    port => 5142
    type => snort
  }
}
filter {
  if [host] =~ /172\.17\.37\.2/ or [host] =~ /10.0.8.2/ or [host] =~ /10\.0\.11\.2/ or [host] =~ /10\.0\.10\.2/ or [host] =~ /10\.0\.12\.2/ or [host] =~ /10\.0\.14\.2/ or [host] =~ /10\.0\.15\.2/{
    # This is the initial parsing of the log
    grok {
      match => { "message" => "\|%{SPACE}\[%{WORD:msg_source}\[%{WORD:msg}\]\:%{SPACE}\[%{GREEDYDATA:sensor_name}\]%{SPACE}\]%{SPACE}\|\|%{SPACE}%{TIMESTAMP_ISO8601:event_timestamp}%{SPACE}%{INT:event_priority}%{SPACE}\[%{INT:gid}:%{INT:sid}:%{INT:rev}\]%{SPACE}%{DATA:alert_description}\|\|%{SPACE}%{DATA:classification}%{SPACE}\|\|%{SPACE}%{INT:protocol}%{SPACE}%{IP:SrcIp}%{SPACE}%{IP:DstIp}%{SPACE}\|\|%{SPACE}%{INT:SrcPort}%{SPACE}%{INT:DstPort}%{SPACE}"}
    }
  # If you'd like to collect the DNS name for the SrcIP keep this section.  Caution, this can cause an attacker to go into hiding.
  # If you do not want reverse DNS lookups of IPs keep this uncommented.
    #mutate {
    #  add_field => { "SrcIP-resolved" => "%{SrcIp}" }
    #}
    #dns {
    #  reverse => [ "[SrcIP-resolved]" ]
    #  action => "replace"
    #}

  # This will attempt to do a geoip lookup against the SrcIP
  geoip {
      source => "SrcIp"
      target => "SrcGeoip"
      database => "/etc/logstash/geoip/GeoLite2-City.mmdb"
      add_field => [ "[SrcGeoip][location]", "%{[geoip][longitude]}" ]
      add_field => [ "[SrcGeoip][location]", "%{[geoip][latitude]}"  ]
      add_tag => "geoip"
    }
    mutate {
      convert => [ "[SrcGeoip][location]", "float"]
    }
    #geoip {
    #  source => "[SrcIp]"
    #  target => "SrcGeo"

  #}


  # If you'd like to collect the DNS name for the DstIP keep this section.  Caution, this can cause an attacker to go into hiding.
  # If you do not want reverse DNS lookups of IPs keep this uncommented.
    #mutate {
    #  add_field => { "DstIP-resolved" => "%{DstIp}" }
    #}
    #dns {
    #  reverse => [ "[DstIP-resolved]" ]
    #  action => "replace"
    #}


  # This will attempt to do a geoip lookup against the DstIP
  geoip {
      source => "DstIp"
      target => "DstGeoip"
      database => "/etc/logstash/geoip/GeoLite2-City.mmdb"
      add_field => [ "[DstGeoip][location]", "%{[geoip][longitude]}" ]
      add_field => [ "[DstGeoip][location]", "%{[geoip][latitude]}"  ]
      add_tag => "geoip"
    }
    mutate {
      convert => [ "[DstGeoip][location]", "float"]
    }

    #geoip {
    #  source => "[DstIp]"
    #  target => "DstGeo"

    #}



  # If the alert is a Snort GPL alert break it apart for easier reading and categorization
    if [alert_description] =~ "GPL " {
    # This will parse out the category type from the alert
      grok {
        match => { "alert" => "GPL\s+%{DATA:category}\s" }
      }
    # This will store the category
      mutate {
        add_field => { "rule_type" => "Snort GPL" }
        lowercase => [ "category"]
        }
    }
  # If the alert is an Emerging Threat alert break it apart for easier reading and categorization
    if [alert_description] =~ "ET " {
    # This will parse out the category type from the alert
      grok {
        match => { "alert" => "ET\s+%{DATA:category}\s" }
      }
    # This will store the category
      mutate {
        add_field => { "rule_type" => "Emerging Threats" }
        lowercase => [ "category"]
      }
    }
  # I recommend changing the field types below to integer so searches can do greater than or less than
  # and also so math functions can be ran against them
    mutate {
      convert => [ "SrcPort", "integer" ]
      convert => [ "DstPort", "integer" ]
      convert => [ "event_priority", "integer" ]
      convert => [ "protocol", "integer" ]

      remove_field => [ "message"]
    }
  # This will translate the priority field into a severity field of either High, Medium, or Low
  if [event_priority] == 1 {
      mutate {
        add_field => { "severity" => "High" }
      }
    }
    if [event_priority] == 2 {
      mutate {
        add_field => { "severity" => "Medium" }
      }
    }
    if [event_priority] == 3 {
      mutate {
        add_field => { "severity" => "Low" }
      }
    }
    # This section adds URLs to lookup information about a rule online
      mutate {

    add_field => [ "ET_Signature_Info", "http://doc.emergingthreats.net/%{sid}" ]
    add_field => [ "Snort_Signature_Info", "https://www.snort.org/search?query=%{gid}-%{sid}" ]

    }


  #protocol type detection
  if [protocol] == 17 {
   mutate {
    replace => { "protocol" => "UDP" }
  }

  }

  if [protocol] == 6 {
   mutate {
    replace => { "protocol" => "TCP" }
  }

  }


  if [protocol] == 1 {
   mutate {
    replace => { "protocol" => "ICMP" }
  }

  }

  if [protocol] == 2 {
   mutate {
    replace => { "protocol" => "IGMP" }
  }

  }

  }
}

output {
  if [msg_source]== "SNORTIDS"  {
    elasticsearch {
      index => "ids_sensors"
      hosts => ["localhost:9200"]}
    #stdout { codec => rubydebug }

  }
}

any idea?

^^ is your problem.

When you add both [geoip][longitude] and [geoip][latitude] to the field [SrcGeoip][location] you create an array because the second add field don't want to overwrite the first value.

With the latest version of the geoip plugin the event (without the add_fields) looks like this.

{
          "tags" => [
        [0] "geoip"
    ],
      "@version" => "1",
       "message" => "8.8.8.8",
    "@timestamp" => 2018-01-10T17:04:45.178Z,
          "host" => "Elastics-MacBook-Pro.local",
      "SrcGeoip" => {
              "location" => {
                            "lon" => -97.822,
                            "lat" => 37.751
                         },
             "longitude" => -97.822,
                    "ip" => "8.8.8.8",
          "country_name" => "United States",
              "latitude" => 37.751,
         "country_code2" => "US",
        "continent_code" => "NA",
         "country_code3" => "US"
    },
      "sequence" => 0
}

NOTE:
[SrcGeoip][longitude] is already a float.

1 Like

Thanks for the helpful post, removed the mutate, now i have new issue in Logstash log as follows:
any idea?
Thanks

    [2018-01-12T06:54:16,863][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"ids_sensors", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x7ff85f68>], :response=>{"index"=>{"_index"=>"ids_sensors", "_type"=>"doc", "_id"=>"MHq36GAB8wQ4IAya8AEb", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [SrcGeoip.location] of different type, current_type [float], merged_type [text]"}}}}
[2018-01-12T06:55:39,465][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"ids_sensors", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x76a4ad43>], :response=>{"index"=>{"_index"=>"ids_sensors", "_type"=>"doc", "_id"=>"MXq56GAB8wQ4IAyaMgHF", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [DstGeoip.location] of different type, current_type [float], merged_type [text]"}}}}
[2018-01-12T06:59:16,043][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"ids_sensors", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x405d03fe>], :response=>{"index"=>{"_index"=>"ids_sensors", "_type"=>"doc", "_id"=>"Mnq86GAB8wQ4IAyagAHH", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [SrcGeoip.location] of different type, current_type [float], merged_type [text]"}}}}
[2018-01-12T07:01:56,035][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"ids_sensors", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x65912a3a>], :response=>{"index"=>{"_index"=>"ids_sensors", "_type"=>"doc", "_id"=>"M3q-6GAB8wQ4IAya8QG_", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [DstGeoip.location] of different type, current_type [float], merged_type [text]"}}}}
[2018-01-12T07:04:16,731][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"ids_sensors", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x422536c8>], :response=>{"index"=>{"_index"=>"ids_sensors", "_type"=>"doc", "_id"=>"NHrB6GAB8wQ4IAyaFwFX", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [SrcGeoip.location] of different type, current_type [float], merged_type [text]"}}}}
[2018-01-12T07:08:54,133][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"ids_sensors", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x568033d>], :response=>{"index"=>{"_index"=>"ids_sensors", "_type"=>"doc", "_id"=>"NXrF6GAB8wQ4IAyaUgHf", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"Rejecting mapping update to [ids_sensors] as the final mapping would have more than 1 type: [doc, snort]"}}}}
[2018-01-12T07:09:16,347][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"ids_sensors", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x58c5d2ee>], :response=>{"index"=>{"_index"=>"ids_sensors", "_type"=>"doc", "_id"=>"NnrF6GAB8wQ4IAyaqQG3", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [SrcGeoip.location] of different type, current_type [float], merged_type [text]"}}}}
[2018-01-12T07:12:57,959][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"ids_sensors", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x40cbf1cb>], :response=>{"index"=>{"_index"=>"ids_sensors", "_type"=>"doc", "_id"=>"N3rJ6GAB8wQ4IAyaCwFj", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [DstGeoip.location] of different type, current_type [float], merged_type [text]"}}}}
[2018-01-12T07:14:16,373][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"ids_sensors", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x16229a5e>], :response=>{"index"=>{"_index"=>"ids_sensors", "_type"=>"doc", "_id"=>"OHrK6GAB8wQ4IAyaPQGx", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [SrcGeoip.location] of different type, current_type [float], merged_type [text]"}}}}

Did you remove the add_fields pieces?

If you did, then [SrcGeoip][location] should be a JSON Object. ES will still complain that the mapping can't be satisfied because [SrcGeoip][location] is an object and not a float (like the data before).

But ES is complaining that [SrcGeoip][location] is text not the expected float. So my question is how are you setting [SrcGeoip][location] to a text value?

1 Like

changed the geoip as follows , and its works
Thanks
if [SrcIp] {
geoip {
source => "SrcIp"
target => "SrcGeoip.location"
database => "/etc/logstash/geoip/GeoLite2-City.mmdb"
add_tag => "geoip"
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.