ID generation in sequential logs try to link MAJOR EVENT and MINOR EVENTS by an ID share using filter elasticsearch error _elasticsearch_lookup_failure or ruby filter

Here is my log anonymized :

[DEBUG] 12-04-2019 07:13:28 [Import] - Columns= TABLE_DESCRIPTION(____COL1___COL2___COL3___COL4___5___6___7___8___9___10__..._________________________________)
[ INFO] 12-04-2019 07:13:28 [Import] - NomTable=MY_SQL_REQUEST
[ INFO] 12-04-2019 07:13:28 [Import] - Open Filename=MY_FILENAME
[DEBUG] 12-04-2019 07:13:35 [Import] - InitWriteToDB
[DEBUG] 12-04-2019 07:13:35 [Import] - Fin InitWriteToDB
[ INFO] 12-04-2019 07:13:35 [Import] - WriteToDB, maxrows=ROW_NUMBER
[ INFO] 12-04-2019 07:13:36 [Import] - Fin WriteToDB
[ INFO] 12-04-2019 07:13:36 [Import] - Close Filename=MY_FILENAME
[ INFO] 12-04-2019 07:13:36 [Import] - Open Filename=MY_FILENAME
[DEBUG] 12-04-2019 07:13:48 [Import] - InitWriteToDB
[DEBUG] 12-04-2019 07:13:48 [Import] - Fin InitWriteToDB
[ INFO] 12-04-2019 07:13:48 [Import] - WriteToDB, maxrows=ROW_NUMBER
[ INFO] 12-04-2019 07:13:48 [Import] - Fin WriteToDB
[ INFO] 12-04-2019 07:13:48 [Import] - Close Filename=MY_FILENAME
[ INFO] 12-04-2019 07:13:48 [Import] - Open Filename=MY_FILENAME
[DEBUG] 12-04-2019 07:13:55 [Import] - InitWriteToDB
[DEBUG] 12-04-2019 07:13:55 [Import] - Fin InitWriteToDB
[ INFO] 12-04-2019 07:13:55 [Import] - WriteToDB, maxrows=ROW_NUMBER
[ INFO] 12-04-2019 07:13:56 [Import] - Fin WriteToDB
[ INFO] 12-04-2019 07:13:56 [Import] - Close Filename=MY_FILENAME

In this pattern inside my log i have two part , first part is the [MY_SQL_REQUEST] is the " MAJOR " part and second part is [OPEN] ={ operation }=> [CLOSE] " MINOR ".

The "operation" is generated by the "request" and number of operation geranated is variable sometimes you have 11 othertime 17 ...

My server run request and operation sequentially, so log is also sequential.

The problem for me is that "request" and "operation" is not linked by an "ID" in my log.
So i created myself with ruby filter an "OPERATION_ID" and "CYCLE_ID" it was possible because my log is sequential.

Now the problem come when i try get the last "CYCLE_ID" from "request (cycle) event" with elasticsearch filter to introduce them in "operation (open & close) event" to create the link between "request" and "operation".

So when i see it doesn't work i tried elasticsearch filter example to see if it work but i have a same problem.

The problem with elasticsearch filter looks like event is not inserted directly by logstash when he match it.

I think events is locked in a buffer of logstash which wait is full to insert them all.

This create the problem because when elasticsearch filter try to get an event he found nothing because event is in buffer and not in database.

I also tried to do something only with ruby filter but another problem appears...

Here is my configuration :

   filter{
    if [message] =~ /NomTable=/ {
    	grok{match=>{"message"=>"%{DATESTAMP:DATE}"}}
    	grok{match=>{"message"=>"(?<table_import>(?<=NomTable\=).*?(?=,))"}}
    	grok{match=>{"message"=>"(?<interface_import>(?<=ImportID\=).*?(?=,))"}}
    	grok{match=>{"message"=>"(?<input_directory>(?<=SearchFileName\=).*?(?=,))"}}
    	grok{match=>{"message"=>"(?<backup_directory>(?<=PathNameBak\=).*?(?=,))"}}
    	grok{match=>{"input_directory"=>"(?<FILE_PATTERN>[A-Za-z0-9-./\*_]+$)"}}
    	grok{match=>{"input_directory"=>"(?<close_target>(?<=[A-Z]:\\(IN)\\)(?:(I|i)(M|m)(P|p)(O|o)(R|r)(T|t)_?(FT|CC))\\[A-Z0-9_]+(?=\\[A-Z0-9_]+\\[A-Za-z0-9-./\*_]+))"}}
    	grok{match=>{"close_target"=>"(?<mode_de_transfert>(?<=\\)[A-Za-z0-9_].+?$)"}}
    	grok{match=>{"backup_directory"=>"(?<IDF>(?<=\\\\)[A-Z0-9]+(?=\\\\$))"}}
    	grok{match=>{"backup_directory"=>"(?<application_source>^[A-Z0-9]{3})"}}
    	date{
                match => ["DATE","dd-MM-yyyy HH:mm:ss"]
    		add_field=>{"type"=>"CYCLE_START"}
    	}
    	ruby {
    	init => "@counter_cycle = 0"
    	code => "
    		@counter_cycle += 1
    		event.set('CYCLE_ID', @counter_cycle)
    	"
    	remove_field=> ["close_target"]
    	}
    	} else if [message] =~ /Open/ {
        grok{match=>{"message"=>"%{DATESTAMP:DATE}"}}
    	grok{match=>{"message"=>"(?<filename_full_path>(?<=Open Filename=).+)"}}
    	date{
    		match => ["DATE","dd-MM-yyyy HH:mm:ss"]
    		remove_field => ["message","DATE"]
    	}

	ruby {
	init => "@counter_open = 0"
	code => "
		@counter_open += 1
		event.set('OPERATION_ID', @counter_open)
		"
		add_field=>{"type"=>"OPEN_OPERATION"}
		add_tag => [ "operationStart" ]
	}
    } else if [message] =~ /Close/ {
	grok{match=>{"message"=>"%{DATESTAMP:DATE}"}}
	grok{match=>{"message"=>"(?<filename_full_path>(?<=Open Filename=).+)"}}
	date{
		match => ["DATE","dd-MM-yyyy HH:mm:ss"]
	}
	ruby {
	init => "@counter_close = 0"
	code => "
		@counter_close += 1
		event.set('OPERATION_ID', @counter_close)
		"
		add_field=>{"type"=>"CLOSE_OPERATION"}
		add_tag => [ "operationEnd" ]
		}
    } else {
	drop{}
    }
		
#	ruby {
#	init => "@counter_close = 0 @counter_open = 0 @counter_cycle = 0"
#	code => "
#	if (event.get('message') =~ /Close/) {
#	
#	@counter_close += 1
#	event.set('OPERATION_ID', @counter_close)
#	event.set('CYCLE_ID', @counter_cycle)
#	
#	} else if (event.get('message') =~ /Open/) {
#						
#	@counter_open += 1
#	event.set('OPERATION_ID', @counter_open)
#	event.set('CYCLE_ID', @counter_cycle)
#	
#	} else if (event.get('message') =~ /NomTable=/) {
#						
#	@counter_cycle += 1
#	event.set('CYCLE_ID', @counter_cycle)						
#	} else {
#	return
#	}
#	"
#	add_field=>{"type"=>"CLOSE_OPERATION"}
#	add_tag => [ "operationEnd" ]
#	}
#	}
}

Here is my error output :

 [ERROR] 2019-07-15 14:24:53.141 [[main]>worker0] ruby - Ruby exception occurred: can't convert nil into an exact number
[WARN ] 2019-07-15 14:24:53.166 [[main]>worker0] elasticsearch - Failed to query elasticsearch for previous event {:index=>"vximport", :error=>"[404] {\"error\":{\"root_cause\":[{\"type\":\"index_not_found_exception\",\"reason\":\"no such index\",\"resource.type\":\"index_or_alias\",\"resource.id\":\"vximport\",\"index_uuid\":\"_na_\",\"index\":\"vximport\"}],\"type\":\"index_not_found_exception\",\"reason\":\"no such index\",\"resource.type\":\"index_or_alias\",\"resource.id\":\"vximport\",\"index_uuid\":\"_na_\",\"index\":\"vximport\"},\"status\":404}"}
[ERROR] 2019-07-15 14:24:53.168 [[main]>worker0] ruby - Ruby exception occurred: can't convert nil into an exact number
[WARN ] 2019-07-15 14:24:53.196 [[main]>worker0] elasticsearch - Failed to query elasticsearch for previous event {:index=>"vximport", :error=>"[404] {\"error\":{\"root_cause\":[{\"type\":\"index_not_found_exception\",\"reason\":\"no such index\",\"resource.type\":\"index_or_alias\",\"resource.id\":\"vximport\",\"index_uuid\":\"_na_\",\"index\":\"vximport\"}],\"type\":\"index_not_found_exception\",\"reason\":\"no such index\",\"resource.type\":\"index_or_alias\",\"resource.id\":\"vximport\",\"index_uuid\":\"_na_\",\"index\":\"vximport\"},\"status\":404}"}
[ERROR] 2019-07-15 14:24:53.198 [[main]>worker0] ruby - Ruby exception occurred: can't convert nil into an exact number
[INFO ] 2019-07-15 14:24:55.115 [[main]>worker0] file - Opening file {:path=>"/home/username2/vx/import/instance/vx_results.json"}
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated

Here is an event before insert :

{
            "path" => "/home/username2/vx/import/instance/vximport_text.txt",
      "@timestamp" => 2019-04-12T07:15:15.000Z,
            "tags" => [
        [0] "_grokparsefailure",
        [1] "operationEnd",
        [2] "_elasticsearch_lookup_failure",
        [3] "_rubyexception"
    ],
         "message" => "[ INFO] 12-04-2019 07:15:15 [Import] - Close Filename=E:\\IN\\IMPORT_FT\\CFT\\A_INTEGRER\\09BCP102\\20190412.06340645.D1206375.ITZ44G2.txt",
    "OPERATION_ID" => 16,
            "type" => "CLOSE_OPERATION",
        "@version" => "1",
            "host" => "servername2",
            "DATE" => "12-04-2019 07:15:15"
}

Here is an event after buffer insert :

{
            "@timestamp" => 2019-04-12T07:15:29.000Z,
              "DURATION" => 0.03361111111111111,
               "started" => 2019-04-12T07:13:28.000Z,
    "ELASTIC_OUT_SECOND" => "[started]",
                  "path" => "/home/username2/vx/import/instance/vximport_text.txt",
                  "tags" => [
        [0] "_grokparsefailure",
        [1] "operationEnd"
    ],
               "message" => "[ INFO] 12-04-2019 07:15:29 [Import] - Close Filename=E:\\IN\\IMPORT_FT\\CFT\\A_INTEGRER\\09BCP102\\20190412.06341332.D1206383.ITZ44G2.txt",
     "ELASTIC_OUT_FIRST" => "started",
          "OPERATION_ID" => 37,
                  "type" => "CLOSE_OPERATION",
              "@version" => "1",
                  "host" => "servername2",
                  "DATE" => "12-04-2019 07:15:29",
     "ELASTIC_OUT_THIRD" => "[started]"
}

But duration is false because it was created with the old event which was in buffer.

Thanks for any read and any help :grinning:

If someone know some way to force logstash insert directly event in database and not in a buffer i think it will resolve my problem or maybe i need to use logstash with another way...

The logstash pipeline processes events in batches. --pipeline.batch.size can be used to adjust the size down from the default 125.

Thanks i will test this

Finally reduce --pipeline.batch.size =1 and --pipeline.batch.delay =1 doesn't work i tried many other option but elasticsearch filter can't get dynamically data as i want so i found another way with ruby filter which work :

filter{
	if[message]=~/NomTable/ or [message]=~/Open/ or [message]=~/Close/{
		ruby{ #SHARE ID
			init=>"@counter_cycle=0"
			code=>"
			event.get('message')=~/NomTable/?@counter_cycle+=1:@counter_cycle+=0         #THIS PART SHARE A MAJOR_ID BEBTWEEN MAJOR_EVENT AND MINOR_EVENTS AND INCREMENT WHEN FOUND MAJOR_EVENT
			event.set('CYCLE_ID',@counter_cycle)
			"
		}
		if[message]=~/NomTable=/{
			grok{match=>{"message"=>"%{DATESTAMP:date_string}"}
				add_field=>{"type"=>"CYCLE_START"}}
			grok{match=>{"message"=>"(?<table_import>(?<=NomTable\=).*?(?=,))"}}
			grok{match=>{"message"=>"(?<interface_import>(?<=ImportID\=).*?(?=,))"}}
			grok{match=>{"message"=>"(?<input_directory>(?<=SearchFileName\=).*?(?=,))"}}
			grok{match=>{"message"=>"(?<backup_directory>(?<=PathNameBak\=).*?(?=,))"}}
			grok{match=>{"input_directory"=>"(?<FILE_PATTERN>[A-Za-z0-9-./\*_]+$)"}}
			grok{match=>{"input_directory"=>"(?<close_target>(?<=[A-Z]:\\(IN)\\)(?:(I|i)(M|m)(P|p)(O|o)(R|r)(T|t)_?(FT|CC))\\[A-Z0-9_]+(?=\\[A-Z0-9_]+\\[A-Za-z0-9-./\*_]+))"}}
			grok{match=>{"close_target"=>"(?<mode_de_transfert>(?<=\\)[A-Za-z0-9_].+?$)"}}
			grok{match=>{"backup_directory"=>"(?<IDF>\b[A-Z0-9]+(?=\\$))"}
				remove_field=>["close_target"]}
			date{
				match=>["DATE","dd-MM-yyyy HH:mm:ss"]
			}
		}elseif[message]=~/Open/{
			grok{match=>{"message"=>"%{DATESTAMP:DATE}"}}
			grok{match=>{"message"=>"(?<filename_full_path>(?<=Filename=).+)"}}
			date{
				match=>["DATE","dd-MM-yyyy HH:mm:ss"]
			}
			ruby{
				init=>"@counter_open=0"
				code=>"
					@counter_open+=1
					event.set('OPERATION_ID',@counter_open)
				"
				add_field=>{"type"=>"OPEN_OPERATION"}
				add_tag=>["operationStart"]
			}
		}elseif[message]=~/Close/{
			grok{match=>{"message"=>"%{DATESTAMP:DATE}"}}
			grok{match=>{"message"=>"(?<filename_full_path>(?<=Filename=).+)"}}
			date{
				match=>["DATE","dd-MM-yyyy HH:mm:ss"]
			}
			ruby{
				init=>"@counter_close=0"
				code=>"
					@counter_close+=1
					event.set('OPERATION_ID',@counter_close)
				"
				add_field=>{"type"=>"CLOSE_OPERATION"}
				add_tag=>["operationEnd"]
			}
		}
	ruby{ #DURATION
		init=>"@type_open_timestamp=nil"
		code=>"
		event.get('message')=~/Open/?@type_open_timestamp=event.get('@timestamp'):@type_open_timestamp=@type_open_timestamp
		event.get('message')=~/Close/?event.set('DURATION',(event.get('@timestamp')-@type_open_timestamp)):@type_open_timestamp=@type_open_timestamp  #THIS PART DO DYNAMICALLY DURATION
		"
	}
	}else{
		drop{}
	}

}

ruby filter + ternary condition

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.