Hey I am using a very simple aggregation to count entries in an SQL Database.
If a specific event is being monitored the counter should be lowered by one otherwise it should count up.
Everything is running under a unique task id.
Code:
filter {
aggregate {
task_id => "%{order}"
code => "
map['count'] ||= 0
map['wip'] ||= 0
map['count'] += 1
if event.get('partnumber') != 0
map['wip'] += 1
end
if event.get('finaltest_ok') == 1
map['wip'] -= 1
end
event.set('counter', (map['count'].to_f))
event.set('bad_parts', (map['wip'].to_f))
"
}
}
But what it see in ES/Kibana seems like that the mapping has a timeout and the counter is being reset.
And here is my pipeline config:
- pipeline.id: database_test
pipeline.workers: 1
queue.type: persisted
queue.checkpoint.acks: 0
queue.checkpoint.writes: 0
queue.checkpoint.interval: 0
path.config: "/logstash-7.3.0/bin/database_test.conf"
Any idea why this is happening?