Dear all,
Any help welcome, since I did all I could but now I need specialists...
I have 1 logstash instance fed by 1 filebeat and outputting to 1 ES.
I get an OOM logstash crash after a few hours. I tried to increase the Xmx upto 10G but it still crashes.
I used eclipse MAT to analyze de heap dump file, and here is the suspect it found... But I don't know what setting these objects represents. Can it be because we use ruby filter ? Many thanks in advance.
here is my config :
input {
beats {
port => "5044"
codec => plain {
charset => "UTF-8"
}
}
}
filter {
ruby {
code => "event.set('message', event.get('message').split(/\n/))"
}
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:[messageDate][]} %{WORD:[messageType][]} (?<fileSrc>.[a-zA-Z|.]+) - %{GREEDYDATA:[messageData][]}"}
}
ruby {
# path to Ruby script
path => "/home/elk/logstash-6.2.1/flunch_logs.rb"
}
date {
match => [ "date", "YYYY-MM-dd HH:mm:ss,SSS"]
}
mutate {
remove_field => ["messageData", "messageDate", "messageType"]
}
}
# Mapping site code and geolocalisation
filter {
# Latitude
translate {
dictionary_path => "/home/elk/logstash-6.2.1/geo.yml"
field => "lat"
destination => "[geoip][location][lat]"
}
# Longitude
translate {
dictionary_path => "/home/elk/logstash-6.2.1/geo.yml"
field => "lon"
destination => "[geoip][location][lon]"
}
# Suppress temp variables
mutate { remove_field => [ "lat", "lon" ] }
}
output {
# ElasticsearchHost
elasticsearch { hosts => ["localhost:9200"] }
# fordebug
stdout { codec => rubydebug }
}
and the ruby filter :
require 'date'
def parse_date(date)
return DateTime.strptime(date, '%Y-%m-%d %H:%M:%S,%L')
end
def filter(event)
messageType = event.get('messageType')
messageDate = event.get('messageDate')
messageData = event.get('messageData')
if(defined?(messageType))
result = nil
date = nil
if(messageType.kind_of?(Array))
if(messageType.include?('ERROR'))
event.set('state', 'error')
else
event.set('state', 'info')
end
filename = event.get('source')
results = filename.scan(/C([0-9]+)(FR\d+)-/)
begin
restaurant = results[0][0]
checkout_number = results[0][1]
event.set('restaurant', restaurant)
event.set('checkout_number', checkout_number)
# Variable temporaire pour geolocatisation
event.set('lat', restaurant + '_lat')
event.set('lon', restaurant + '_lon')
rescue => exception
event.set('restaurant', nil)
event.set('checkout_number', nil)
end
# Origine : askFid, commitFid, fidbridge
if(messageData[0].include?('requete'))
if(messageData[0].include?('askFid'))
event.set('origin', 'askFid')
elsif(messageData[0].include?('commitFid'))
event.set('origin', 'commitFid')
else
event.set('origin', 'undefined')
end
elsif(messageData[0].include?('fidbridge'))
event.set('origin', 'fidbridge')
else
event.set('origin', 'undefined')
end
# Degraded mode
messageData.each do |msg|
if(msg.include?('erreur d\'appel askFid'))
if(msg.include?('Aucun serveur joignable'))
event.set('mode', 'degrade_3')
elsif(msg.include?('secondaire'))
event.set('mode', 'degrade_2')
elsif(msg.include?('primaire'))
event.set('mode', 'degrade_1')
end
else
event.set('mode', 'nominal')
end
end
date = messageDate[0]
event.set('date', date)
result = (0..messageType.length-1).map do |index|
{type: messageType[index], data: messageData[index]}
end
# Calcul du temps total
first_message = parse_date(date)
last_message = parse_date(messageDate[messageDate.length - 1])
event.set('total_time', last_message.strftime('%Q').to_i - first_message.strftime('%Q').to_i)
elsif
date = messageDate
event.set('origin', 'application')
event.set('date', date)
result = [{type: messageType, data: messageData}]
end
parsed_date = parse_date(date)
if !parsed_date.nil? and parsed_date.strftime('%k').to_i < 16
event.set('service', 'lunch')
elsif !parsed_date.nil?
event.set('service', 'dinner')
end
event.set('message', result)
end
return [event]
end