Hi,
I have multiple log files that I want to load in Kibana. Each file has at the beginning few lines with configuration information, that has to be added to each log line from this file.
Is there a way to extract this configuration settings, skip those lines and add this information to each line after these?
Also I have some configuration info available in the file path, is it possible to extract it in the logstash input as a parameter and use it later in the output (PART_OF_THE_LOG_FILE_PATH_HERE) while loading multiple files?
For me, one possibility, is to use tag_on_failure in grok and ruby global variable.
With grok, get the values from the configuration lines. If you don't get a match (so it's not a configuration line) put a specific value in a field to specify "you want to send data to elasticsearch".
With ruby saved the value of configuration and get it for the next lines when "you want to send data to elasticsearch".
The file may look like that:
input {
file {
path => ["C:/logs/**/*.log"]
sincedb_path => "NULL"
start_position => "beginning"
}
}
filter {
grok {
match => {
"message" => [
"%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}[=]+",
"%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Version:%{SPACE}(?<Version>[^\n]*)",
"%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Type%{SPACE}(?<Type>[^\n]*)",
"%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Driver Version:%{SPACE}(?<DriverVersion>[^\n]*)",
"%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}User:%{SPACE}(?<User>[^\n]*)",
"%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Configuration%{SPACE}:%{SPACE}(?<Configuration>[^\n]*)",
"%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Host name%{SPACE}:%{SPACE}(?<HostName>[^\n]*)",
"%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Config file:%{SPACE}(?<ConfigFile>[^\n]*)",
"%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Task id:%{SPACE}(?<TaskId>[^\n]*)"
]
}
#If none of the pattern is correct (so if it's not a line of configuration) I put "send" in [tag]
#See tag_on_failure in logstash grok filter documentation
tag_on_failure => [ "send" ]
}
grok {
match => {
"message" => [
#grok pattern for next logs
]
}
}
ruby {
code =>
"
#If the tag field is not equals to 'send' then one of the configuration is not empty so I register his value
if !event.get('tag).equals('send') then
if !event.get('Version').empty? then
@@memoryVersion = event.get('Version');
elsif !event.get('Type').empty? then
@@memoryType = event.get('Type');
elsif !event.get('DriverVersion').empty? then
@@memoryDriverVersion = event.get('DriverVersion');
elsif !event.get('User').empty? then
@@memoryUser = event.get('User');
elsif !event.get('Configuration').empty? then
@@memoryConfiguration = event.get('Configuration');
elsif !event.get('HostName').empty? then
@@memoryHostName = event.get('HostName');
elsif !event.get('ConfigFile').empty? then
@@memoryConfigFile = event.get('ConfigFile');
elsif !event.get('TaskId').empty? then
@@memoryTaskId = event.get('TaskId');
else
@@memoryDate = event.get('Date');
end
#If the tag field is equals to 'send' I put the value of configuration I saved before in the corresponding field.
else
event.set('Version', @@memoryVersion);
event.set('Type', @@memoryType);
event.set('DriverVersion', @@memoryDriverVersion);
event.set('User', @@memoryUser);
event.set('Configuration', @@memoryConfiguration);
event.set('HostName', @@memoryHostName);
event.set('ConfigFile', @@memoryConfigFile);
event.set('TaskId', @@memoryTaskId);
event.set('Date', @@memoryDate);
end
"
}
}
output {
#Verification of the field to see if yes or not I send the fields
if "send" in [tag] {
stdout {codec => rubydebug}
elasticsearch {
hosts => ["localhost:9200"]
index => "test"
id => "PART_OF_THE_LOG_FILE_PATH_HERE"
}
}
}
I don't have the possibility to try it but i think this configuration is accurate and work with multiple source log file.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.