I use aggregate filter plugin to extract metrics from the log of my application using the following config:
input {
file {
path => "application.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => {
"message" => "%{LOG_PATTERN}"
}
}
if ([module] == "input") {
aggregate {
task_id => "%{req_id}"
code => "
map['in_timestamp'] = event.get('timestamp')
map['module_time'] = {}
"
map_action => "create"
}
}
else if ([module] == "output") {
aggregate {
task_id => "%{req_id}"
code => "
event.set('module_time', map['module_time'])
event.set('in_timestamp', map['in_timestamp'])
"
map_action => "update"
end_of_task => true
}
}
else {
aggregate {
task_id => "%{req_id}"
code => "
map['module_time'][event.get('module')] = event.get('timestamp')
"
map_action => "update"
}
}
}
output {
file {
path => "application.json"
codec => "rubydebug"
}
}
I set Logstash filter workers to 1 both in logstash.yml (pipeline.workers: 1) and command line (-w 1) but it looks like many workers are used.
I see in the output that the timestamp of the event corresponding to line 2016 of my log is before the timestamp of the event corresponding to line 2002 of my log.
As both lines correspond to the same req_id but line 2016 corresponds to output module, I am loosing data.
I see that I can fix the issue for this req_id by changing the value of "pipeline.batch.size" in logstash.yml, but the issue occurs for other req_id.
What am I doing wrong?
How can I check the number of workers used by Logstash ?