Logstash 6.2 out of memory : suspects

Dear all,
Any help welcome, since I did all I could but now I need specialists...
I have 1 logstash instance fed by 1 filebeat and outputting to 1 ES.
I get an OOM logstash crash after a few hours. I tried to increase the Xmx upto 10G but it still crashes.
I used eclipse MAT to analyze de heap dump file, and here is the suspect it found... But I don't know what setting these objects represents. Can it be because we use ruby filter ? Many thanks in advance.

here is my config :

input {
    beats {
        port => "5044"
        codec => plain {
          charset => "UTF-8"
        }
    }
}
filter {
  ruby {
    code => "event.set('message', event.get('message').split(/\n/))"
  }
}
filter {
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:[messageDate][]} %{WORD:[messageType][]} (?<fileSrc>.[a-zA-Z|.]+) - %{GREEDYDATA:[messageData][]}"}
  }
  ruby {
    # path to Ruby script
    path => "/home/elk/logstash-6.2.1/flunch_logs.rb"
  }
  date {
    match => [ "date", "YYYY-MM-dd HH:mm:ss,SSS"]
  }
  mutate {
    remove_field => ["messageData", "messageDate", "messageType"]
  }
}

# Mapping site code and geolocalisation
filter {
  # Latitude
  translate {
    dictionary_path => "/home/elk/logstash-6.2.1/geo.yml"
    field => "lat"
    destination => "[geoip][location][lat]"
  }
  # Longitude
  translate {
    dictionary_path => "/home/elk/logstash-6.2.1/geo.yml"
    field => "lon"
    destination => "[geoip][location][lon]"
  }

  # Suppress  temp variables
  mutate { remove_field => [ "lat", "lon" ] }
}
output {
  # ElasticsearchHost
  elasticsearch { hosts => ["localhost:9200"] }

  # fordebug
  stdout { codec => rubydebug }
}

and the ruby filter :
require 'date'

def parse_date(date)
    return DateTime.strptime(date, '%Y-%m-%d %H:%M:%S,%L')
end

def filter(event)
    messageType = event.get('messageType')
    messageDate = event.get('messageDate')
    messageData = event.get('messageData')

    if(defined?(messageType))
        result = nil
        date = nil

        if(messageType.kind_of?(Array))
            if(messageType.include?('ERROR'))
                event.set('state', 'error')
            else
                event.set('state', 'info')
            end

            filename = event.get('source')
            results = filename.scan(/C([0-9]+)(FR\d+)-/)

            begin
                restaurant = results[0][0]
                checkout_number = results[0][1]
                event.set('restaurant', restaurant)
                event.set('checkout_number', checkout_number)

                # Variable temporaire pour geolocatisation
                event.set('lat', restaurant + '_lat')
                event.set('lon', restaurant + '_lon')
            rescue => exception
                event.set('restaurant', nil)
                event.set('checkout_number', nil)
            end

            # Origine : askFid, commitFid, fidbridge
            if(messageData[0].include?('requete'))
                if(messageData[0].include?('askFid'))
                    event.set('origin', 'askFid')
                elsif(messageData[0].include?('commitFid'))
                    event.set('origin', 'commitFid')
                else
                    event.set('origin', 'undefined')
                end
            elsif(messageData[0].include?('fidbridge'))
                event.set('origin', 'fidbridge')
            else
                event.set('origin', 'undefined')
            end

            # Degraded mode
            messageData.each do |msg|
                if(msg.include?('erreur d\'appel askFid'))
                    if(msg.include?('Aucun serveur joignable'))
                        event.set('mode', 'degrade_3')

                    elsif(msg.include?('secondaire'))
                        event.set('mode', 'degrade_2')

                    elsif(msg.include?('primaire'))
                        event.set('mode', 'degrade_1')
                    end
                else
                    event.set('mode', 'nominal')
                end
            end

            date = messageDate[0]
            event.set('date', date)

            result = (0..messageType.length-1).map do |index|
                {type: messageType[index], data: messageData[index]}
            end

            # Calcul du temps total
            first_message = parse_date(date)
            last_message = parse_date(messageDate[messageDate.length - 1])
            event.set('total_time', last_message.strftime('%Q').to_i - first_message.strftime('%Q').to_i)

        elsif
            date = messageDate
            event.set('origin', 'application')
            event.set('date', date)
            result = [{type: messageType, data: messageData}]
        end

        parsed_date = parse_date(date)
        if !parsed_date.nil? and parsed_date.strftime('%k').to_i < 16
            event.set('service', 'lunch')
        elsif !parsed_date.nil?
            event.set('service', 'dinner')
        end

        event.set('message', result)
    end

    return [event]
end

@anselme,

Which version of the beats input plugin are you using?

(To find the version of the beats input plugin, run bin/logstash-plugin list --verbose logstash-input-beats from the logstash folder)

There have been a number of improvements made to the beats input plugin recently. I would suggest upgrading to the latest version.

To do this run
bin/logstash-plugin update logstash-input-beats
from the Logstash folder.

Thanks,

Rob

Thank you very much Rob for this answer
The command you gave displays : logstash-input-beats (5.0.6)

I read in the release notes that in 5.0.9 the memory management has been improved. I give it a try.
Thanks again
Anselme

Hi Rob,

It worked,

Thanks a lot !

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.