Dear all,
Any help welcome, since I did all I could but now I need specialists...
I have 1 logstash instance fed by 1 filebeat and outputting to 1 ES.
I get an OOM logstash crash after a few hours. I tried to increase the Xmx upto 10G but it still crashes.
I used eclipse MAT to analyze de heap dump file, and here is the suspect it found... But I don't know what setting these objects represents. Can it be because we use ruby filter ? Many thanks in advance.
here is my config :
input {
    beats {
        port => "5044"
        codec => plain {
          charset => "UTF-8"
        }
    }
}
filter {
  ruby {
    code => "event.set('message', event.get('message').split(/\n/))"
  }
}
filter {
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:[messageDate][]} %{WORD:[messageType][]} (?<fileSrc>.[a-zA-Z|.]+) - %{GREEDYDATA:[messageData][]}"}
  }
  ruby {
    # path to Ruby script
    path => "/home/elk/logstash-6.2.1/flunch_logs.rb"
  }
  date {
    match => [ "date", "YYYY-MM-dd HH:mm:ss,SSS"]
  }
  mutate {
    remove_field => ["messageData", "messageDate", "messageType"]
  }
}
# Mapping site code and geolocalisation
filter {
  # Latitude
  translate {
    dictionary_path => "/home/elk/logstash-6.2.1/geo.yml"
    field => "lat"
    destination => "[geoip][location][lat]"
  }
  # Longitude
  translate {
    dictionary_path => "/home/elk/logstash-6.2.1/geo.yml"
    field => "lon"
    destination => "[geoip][location][lon]"
  }
  # Suppress  temp variables
  mutate { remove_field => [ "lat", "lon" ] }
}
output {
  # ElasticsearchHost
  elasticsearch { hosts => ["localhost:9200"] }
  # fordebug
  stdout { codec => rubydebug }
}
and the ruby filter :
require 'date'
def parse_date(date)
    return DateTime.strptime(date, '%Y-%m-%d %H:%M:%S,%L')
end
def filter(event)
    messageType = event.get('messageType')
    messageDate = event.get('messageDate')
    messageData = event.get('messageData')
    if(defined?(messageType))
        result = nil
        date = nil
        if(messageType.kind_of?(Array))
            if(messageType.include?('ERROR'))
                event.set('state', 'error')
            else
                event.set('state', 'info')
            end
            filename = event.get('source')
            results = filename.scan(/C([0-9]+)(FR\d+)-/)
            begin
                restaurant = results[0][0]
                checkout_number = results[0][1]
                event.set('restaurant', restaurant)
                event.set('checkout_number', checkout_number)
                # Variable temporaire pour geolocatisation
                event.set('lat', restaurant + '_lat')
                event.set('lon', restaurant + '_lon')
            rescue => exception
                event.set('restaurant', nil)
                event.set('checkout_number', nil)
            end
            # Origine : askFid, commitFid, fidbridge
            if(messageData[0].include?('requete'))
                if(messageData[0].include?('askFid'))
                    event.set('origin', 'askFid')
                elsif(messageData[0].include?('commitFid'))
                    event.set('origin', 'commitFid')
                else
                    event.set('origin', 'undefined')
                end
            elsif(messageData[0].include?('fidbridge'))
                event.set('origin', 'fidbridge')
            else
                event.set('origin', 'undefined')
            end
            # Degraded mode
            messageData.each do |msg|
                if(msg.include?('erreur d\'appel askFid'))
                    if(msg.include?('Aucun serveur joignable'))
                        event.set('mode', 'degrade_3')
                    elsif(msg.include?('secondaire'))
                        event.set('mode', 'degrade_2')
                    elsif(msg.include?('primaire'))
                        event.set('mode', 'degrade_1')
                    end
                else
                    event.set('mode', 'nominal')
                end
            end
            date = messageDate[0]
            event.set('date', date)
            result = (0..messageType.length-1).map do |index|
                {type: messageType[index], data: messageData[index]}
            end
            # Calcul du temps total
            first_message = parse_date(date)
            last_message = parse_date(messageDate[messageDate.length - 1])
            event.set('total_time', last_message.strftime('%Q').to_i - first_message.strftime('%Q').to_i)
        elsif
            date = messageDate
            event.set('origin', 'application')
            event.set('date', date)
            result = [{type: messageType, data: messageData}]
        end
        parsed_date = parse_date(date)
        if !parsed_date.nil? and parsed_date.strftime('%k').to_i < 16
            event.set('service', 'lunch')
        elsif !parsed_date.nil?
            event.set('service', 'dinner')
        end
        event.set('message', result)
    end
    return [event]
end