Can Logstash read multiple text files , store config data available in first lines and add it to all other lines

Hi,
I have multiple log files that I want to load in Kibana. Each file has at the beginning few lines with configuration information, that has to be added to each log line from this file.

These lines look like that:

        13-Oct-2020 12:06:20  | TST | **** | ====================================
        13-Oct-2020 12:06:20  | TST | **** | Version: 1.25.1
        13-Oct-2020 12:06:20  | TST | **** | Type 32
        13-Oct-2020 12:06:20  | TST | **** | Driver Version: 1.1.0.4
        13-Oct-2020 12:06:20  | TST | **** | User: abcde
        13-Oct-2020 12:06:20  | TST | **** | Configuration      : Auto
        13-Oct-2020 12:06:20  | TST | **** | Host name          : SLAVE-1
        13-Oct-2020 12:06:20  | TST | **** | Config file: \\path\file.config
        13-Oct-2020 12:06:20  | TST | **** | Task id: 1254545-78810521

Is there a way to extract this configuration settings, skip those lines and add this information to each line after these?

Also I have some configuration info available in the file path, is it possible to extract it in the logstash input as a parameter and use it later in the output (PART_OF_THE_LOG_FILE_PATH_HERE) while loading multiple files?

    input {
	file{
		path => ["C:/logs/**/*.log"]
		sincedb_path => "NULL"
		start_position => "beginning"		
	}
}

output {

	stdout {codec => rubydebug}
	elasticsearch {
		hosts => ["localhost:9200"]
		index => "test"
		id => "PART_OF_THE_LOG_FILE_PATH_HERE"
	}
}

Hi,

For me, one possibility, is to use tag_on_failure in grok and ruby global variable.

With grok, get the values from the configuration lines. If you don't get a match (so it's not a configuration line) put a specific value in a field to specify "you want to send data to elasticsearch".

With ruby saved the value of configuration and get it for the next lines when "you want to send data to elasticsearch".

The file may look like that:

input {
    file {
        path => ["C:/logs/**/*.log"]
        sincedb_path => "NULL"
        start_position => "beginning"
    }
}

filter {
    grok {
        match => { 
            "message" => [
                "%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}[=]+",
                "%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Version:%{SPACE}(?<Version>[^\n]*)",
                "%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Type%{SPACE}(?<Type>[^\n]*)",
                "%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Driver Version:%{SPACE}(?<DriverVersion>[^\n]*)",
                "%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}User:%{SPACE}(?<User>[^\n]*)",
                "%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Configuration%{SPACE}:%{SPACE}(?<Configuration>[^\n]*)",
                "%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Host name%{SPACE}:%{SPACE}(?<HostName>[^\n]*)",
                "%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Config file:%{SPACE}(?<ConfigFile>[^\n]*)",
                "%{SPACE}(?<Date>%{INT}-%{MONTH}-%{YEAR}%{SPACE}%{TIME})%{SPACE}\|%{SPACE}%{WORD}%{SPACE}\|%{SPACE}%{DATA:value}%{SPACE}\|%{SPACE}Task id:%{SPACE}(?<TaskId>[^\n]*)"
            ]
        }
        #If none of the pattern is correct (so if it's not a line of configuration) I put "send" in [tag]
        #See tag_on_failure in logstash grok filter documentation
        tag_on_failure => [ "send" ]
    }

    grok {
        match => {
            "message" => [
                #grok pattern for next logs
            ]
        }
    }
                
    ruby {
        code => 
            "
            #If the tag field is not equals to 'send' then one of the configuration is not empty so I register his value
            if !event.get('tag).equals('send') then
                if !event.get('Version').empty? then
                    @@memoryVersion = event.get('Version');
                elsif !event.get('Type').empty? then
                    @@memoryType = event.get('Type');
                elsif !event.get('DriverVersion').empty? then
                    @@memoryDriverVersion = event.get('DriverVersion');
                elsif !event.get('User').empty? then
                    @@memoryUser = event.get('User');
                elsif !event.get('Configuration').empty? then
                    @@memoryConfiguration = event.get('Configuration');
                elsif !event.get('HostName').empty? then
                    @@memoryHostName = event.get('HostName');
                elsif !event.get('ConfigFile').empty? then
                    @@memoryConfigFile = event.get('ConfigFile');
                elsif !event.get('TaskId').empty? then
                    @@memoryTaskId = event.get('TaskId');
                else 
                    @@memoryDate = event.get('Date'); 
                end
            #If the tag field is equals to 'send' I put the value of configuration I saved before in the corresponding field.
            else
                event.set('Version', @@memoryVersion);
                event.set('Type', @@memoryType);
                event.set('DriverVersion', @@memoryDriverVersion);
                event.set('User', @@memoryUser);
                event.set('Configuration', @@memoryConfiguration);
                event.set('HostName', @@memoryHostName);
                event.set('ConfigFile', @@memoryConfigFile);
                event.set('TaskId', @@memoryTaskId);
                event.set('Date', @@memoryDate);
           end
            "
    }
}

output {
    #Verification of the field to see if yes or not I send the fields
    if "send" in [tag] {
        stdout {codec => rubydebug}
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "test"
            id => "PART_OF_THE_LOG_FILE_PATH_HERE"
        }
    }
}

I don't have the possibility to try it but i think this configuration is accurate and work with multiple source log file.

Cad.

There is an example of doing something similar to that here.