ID generation in sequential logs try to link MAJOR EVENT and MINOR EVENTS by an ID share using filter elasticsearch error _elasticsearch_lookup_failure or ruby filter

Here is my log anonymized :

[DEBUG] 12-04-2019 07:13:28 [Import] - Columns= TABLE_DESCRIPTION(____COL1___COL2___COL3___COL4___5___6___7___8___9___10__..._________________________________)
[ INFO] 12-04-2019 07:13:28 [Import] - NomTable=MY_SQL_REQUEST
[ INFO] 12-04-2019 07:13:28 [Import] - Open Filename=MY_FILENAME
[DEBUG] 12-04-2019 07:13:35 [Import] - InitWriteToDB
[DEBUG] 12-04-2019 07:13:35 [Import] - Fin InitWriteToDB
[ INFO] 12-04-2019 07:13:35 [Import] - WriteToDB, maxrows=ROW_NUMBER
[ INFO] 12-04-2019 07:13:36 [Import] - Fin WriteToDB
[ INFO] 12-04-2019 07:13:36 [Import] - Close Filename=MY_FILENAME
[ INFO] 12-04-2019 07:13:36 [Import] - Open Filename=MY_FILENAME
[DEBUG] 12-04-2019 07:13:48 [Import] - InitWriteToDB
[DEBUG] 12-04-2019 07:13:48 [Import] - Fin InitWriteToDB
[ INFO] 12-04-2019 07:13:48 [Import] - WriteToDB, maxrows=ROW_NUMBER
[ INFO] 12-04-2019 07:13:48 [Import] - Fin WriteToDB
[ INFO] 12-04-2019 07:13:48 [Import] - Close Filename=MY_FILENAME
[ INFO] 12-04-2019 07:13:48 [Import] - Open Filename=MY_FILENAME
[DEBUG] 12-04-2019 07:13:55 [Import] - InitWriteToDB
[DEBUG] 12-04-2019 07:13:55 [Import] - Fin InitWriteToDB
[ INFO] 12-04-2019 07:13:55 [Import] - WriteToDB, maxrows=ROW_NUMBER
[ INFO] 12-04-2019 07:13:56 [Import] - Fin WriteToDB
[ INFO] 12-04-2019 07:13:56 [Import] - Close Filename=MY_FILENAME

In this pattern inside my log i have two part , first part is the [MY_SQL_REQUEST] is the " MAJOR " part and second part is [OPEN] ={ operation }=> [CLOSE] " MINOR ".

The "operation" is generated by the "request" and number of operation geranated is variable sometimes you have 11 othertime 17 ...

My server run request and operation sequentially, so log is also sequential.

The problem for me is that "request" and "operation" is not linked by an "ID" in my log.
So i created myself with ruby filter an "OPERATION_ID" and "CYCLE_ID" it was possible because my log is sequential.

Now the problem come when i try get the last "CYCLE_ID" from "request (cycle) event" with elasticsearch filter to introduce them in "operation (open & close) event" to create the link between "request" and "operation".

So when i see it doesn't work i tried elasticsearch filter example to see if it work but i have a same problem.

The problem with elasticsearch filter looks like event is not inserted directly by logstash when he match it.

I think events is locked in a buffer of logstash which wait is full to insert them all.

This create the problem because when elasticsearch filter try to get an event he found nothing because event is in buffer and not in database.

I also tried to do something only with ruby filter but another problem appears...

Here is my configuration :

   filter{
    if [message] =~ /NomTable=/ {
    	grok{match=>{"message"=>"%{DATESTAMP:DATE}"}}
    	grok{match=>{"message"=>"(?<table_import>(?<=NomTable\=).*?(?=,))"}}
    	grok{match=>{"message"=>"(?<interface_import>(?<=ImportID\=).*?(?=,))"}}
    	grok{match=>{"message"=>"(?<input_directory>(?<=SearchFileName\=).*?(?=,))"}}
    	grok{match=>{"message"=>"(?<backup_directory>(?<=PathNameBak\=).*?(?=,))"}}
    	grok{match=>{"input_directory"=>"(?<FILE_PATTERN>[A-Za-z0-9-./\*_]+$)"}}
    	grok{match=>{"input_directory"=>"(?<close_target>(?<=[A-Z]:\\(IN)\\)(?:(I|i)(M|m)(P|p)(O|o)(R|r)(T|t)_?(FT|CC))\\[A-Z0-9_]+(?=\\[A-Z0-9_]+\\[A-Za-z0-9-./\*_]+))"}}
    	grok{match=>{"close_target"=>"(?<mode_de_transfert>(?<=\\)[A-Za-z0-9_].+?$)"}}
    	grok{match=>{"backup_directory"=>"(?<IDF>(?<=\\\\)[A-Z0-9]+(?=\\\\$))"}}
    	grok{match=>{"backup_directory"=>"(?<application_source>^[A-Z0-9]{3})"}}
    	date{
                match => ["DATE","dd-MM-yyyy HH:mm:ss"]
    		add_field=>{"type"=>"CYCLE_START"}
    	}
    	ruby {
    	init => "@counter_cycle = 0"
    	code => "
    		@counter_cycle += 1
    		event.set('CYCLE_ID', @counter_cycle)
    	"
    	remove_field=> ["close_target"]
    	}
    	} else if [message] =~ /Open/ {
        grok{match=>{"message"=>"%{DATESTAMP:DATE}"}}
    	grok{match=>{"message"=>"(?<filename_full_path>(?<=Open Filename=).+)"}}
    	date{
    		match => ["DATE","dd-MM-yyyy HH:mm:ss"]
    		remove_field => ["message","DATE"]
    	}

	ruby {
	init => "@counter_open = 0"
	code => "
		@counter_open += 1
		event.set('OPERATION_ID', @counter_open)
		"
		add_field=>{"type"=>"OPEN_OPERATION"}
		add_tag => [ "operationStart" ]
	}
    } else if [message] =~ /Close/ {
	grok{match=>{"message"=>"%{DATESTAMP:DATE}"}}
	grok{match=>{"message"=>"(?<filename_full_path>(?<=Open Filename=).+)"}}
	date{
		match => ["DATE","dd-MM-yyyy HH:mm:ss"]
	}
	ruby {
	init => "@counter_close = 0"
	code => "
		@counter_close += 1
		event.set('OPERATION_ID', @counter_close)
		"
		add_field=>{"type"=>"CLOSE_OPERATION"}
		add_tag => [ "operationEnd" ]
		}
    } else {
	drop{}
    }
		
#	ruby {
#	init => "@counter_close = 0 @counter_open = 0 @counter_cycle = 0"
#	code => "
#	if (event.get('message') =~ /Close/) {
#	
#	@counter_close += 1
#	event.set('OPERATION_ID', @counter_close)
#	event.set('CYCLE_ID', @counter_cycle)
#	
#	} else if (event.get('message') =~ /Open/) {
#						
#	@counter_open += 1
#	event.set('OPERATION_ID', @counter_open)
#	event.set('CYCLE_ID', @counter_cycle)
#	
#	} else if (event.get('message') =~ /NomTable=/) {
#						
#	@counter_cycle += 1
#	event.set('CYCLE_ID', @counter_cycle)						
#	} else {
#	return
#	}
#	"
#	add_field=>{"type"=>"CLOSE_OPERATION"}
#	add_tag => [ "operationEnd" ]
#	}
#	}
}

The logstash pipeline processes events in batches. --pipeline.batch.size can be used to adjust the size down from the default 125.

Thanks i will test this

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.