Aggregate logs if value of field is same

Hello,

I'm working on a project in my company, The goal is to implement the ELK stack to have an tracability of our data, and be able to aggregate some logs that containt the same value of a field.

I'm trying to aggregate some data but i still don't undersand well how does it work (i've read some documents and see some tutorials on Youtube).

Actually logs that we received are like that:

Log 1:

LIBOR_20Y>>PROCESS1>>OIS_SPREAD/USD/OIS_SPREAD_USD_BSEUR_20Y>0.200625>2020-01-13
OIS_20Y>>PROCESS1>>OIS_SPREAD/USD/OIS_SPREAD_USD_BSEUR_20Y>0.200625>2020-01-13
EUR3M_20Y>>PROCESS1>>OIS_SPREAD/USD/OIS_SPREAD_USD_BSEUR_20Y>0.200625>2020-01-13
EUR3M_USD_20Y>>PROCESS1>>OIS_SPREAD/USD/OIS_SPREAD_USD_BSEUR_20Y>0.200625>2020-01-13
EURIB_20Y>>PROCESS1>>OIS_SPREAD/USD/OIS_SPREAD_USD_BSEUR_20Y>0.200625>2020-01-13
EONIA_20Y>>PROCESS1>>OIS_SPREAD/USD/OIS_SPREAD_USD_BSEUR_20Y>0.200625>2020-01-13

Log 2:

APPLICATION1>>>OIS>>PROCESS2>>OIS_20Y>-0.232323>2020-01-13
APPLICATION1>>>EONIA>>PROCESS2>>EONIA_20Y> 0.323232>2020-01-13

We cand see field 1(line 2) of log 1 match with field 4 (line 1) log 2
Same for field 1 (line 6) of log 1 that match with field 4 (line 2) of log 2

How could i aggregate log when the arrived and have same field ?

Will be always field 1 of log 1 that can match with field 4 of log 2

For info the conf file is like this : indent preformatted text by 4 spaces

########################################Definition des fichiers de logs a analyser########################################
input {
	file {
		path => "D:/elasticsearch/logstash-7.5.0/matrisk_logs/pyrrhus/*.log"
		type => "pyrrhus"
		start_position => "beginning"
		}
	
	file {
		path => "D:/elasticsearch/logstash-7.5.0/matrisk_logs/brutus/*.log"
		type => "brutus"
		start_position => "beginning"
		}
}
########################################Definition des differents filtres : log pyrrhus########################################
filter {
	if [type] == "pyrrhus" {
		grok {
			match => [
			"message", "%{GREEDYDATA:application}>>>%{GREEDYDATA:foldername}>>%{GREEDYDATA:process}>>%{GREEDYDATA:foldernamematurite}>%{GREEDYDATA:value}>%{GREEDYDATA:date}"
			]
			}
			
###Supprime les lignes vides	
	if [message] =~ /^\s*$/ {
		drop { }
	}
	remove_tag  => [ "_grokparsefailure" ]	# remove champ grokparsefailure qui apparait quand ligne n'est pas au bon format
			remove_field => [ "@version", "host" ]  # remove unused stuff
			}
		}

########################################Definition des differents filtres : log brutus########################################
	if [type] == "brutus" {
		grok {
			match => [
			"message", ">>>%{GREEDYDATA:foldername}>>%{GREEDYDATA:process}>>%{GREEDYDATA:pathfolder}>%{GREEDYDATA:value}>%{GREEDYDATA:date}"
			]
			}
			
	if [message] =~ /^\s*$/ {
		drop { }
	}
	
		mutate {
			remove_field => [ "@version", "host" ]  # remove unused stuff
			}
		}
	}
########################################Definition de l'output pour la consultation sous Kibana########################################
output {
    stdout {}
    elasticsearch {
        hosts => "http://localhost:9200"
		index => "carto1"
    }
}

Thank you in advance for your help

Best regards
Julien

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.