Hi all,
I've created a pipeline that aggregate all input documents and generate a count for each group.
in this moment I have two problems :
- at the first execution the aggregation works fine and the count is correct
- after the first execution each count double, I think why the map remain in memory
- all input documents are sent to the output
what I want is:
- find a way to release the map after each execution
- send to the output only the aggregate documents
please help me
thank you in advance
Diego
here my pipeline:
input {
elasticsearch{
read all documents
}
}
filter {
ruby {
code => "
require 'time'
if event.get('reg512Created')
date = Time.parse(event.get('reg512Created'))
event.set('reg512Created_day', date.strftime('%d-%m-%Y'))
end
"
}
aggregate {
task_id => "%{reg512UPNRequestor}-%{reg512Created_day}"
code => "
map['regCount'] ||= 0; map['regCount'] += 1
event.set('regCount', map['regCount'])
"
push_previous_map_as_event => true
timeout => 45
}
aggregate {
task_id => "%{reg512UPNRequestor}-%{reg512Created_day}"
end_of_task => true
}
}
output {
elasticsearch{
receive all documents
}
}